Browse Source

add admin RPC find call, add kademlia NodeLookup interactive

master
arnaucube 5 years ago
parent
commit
ff0ba3995a
13 changed files with 264 additions and 22 deletions
  1. +31
    -1
      README.md
  2. +9
    -0
      cmd/cmd.go
  3. +1
    -0
      config.test0.yaml
  4. +1
    -0
      config.test1.yaml
  5. +1
    -0
      config.test2.yaml
  6. +9
    -0
      config.test3.yaml
  7. +1
    -0
      config/config.go
  8. +80
    -2
      kademlia/kademlia.go
  9. +3
    -3
      kademlia/kademlia_test.go
  10. +78
    -0
      node/admin.go
  11. +14
    -4
      node/node.go
  12. +31
    -12
      rpc-test/test.go
  13. +5
    -0
      run-dev-nodes.sh

+ 31
- 1
README.md

@ -12,7 +12,37 @@ To run a node:
go run main.go --config config.test0.yaml --debug start go run main.go --config config.test0.yaml --debug start
``` ```
To run 3 test nodes inside a tmux session:
## Test
- Scenario:
```
+--+ +--+
|n0+-----------+n1|
+-++ +--+
|
|
| +--+ +--+
+----+n2+-----------+n3|
+--+ +--+
```
- To run 4 test nodes inside a tmux session:
``` ```
bash run-dev-nodes.sh bash run-dev-nodes.sh
``` ```
Using the `test.go` in the `rpc-test` directory:
- calls to the node to perform lookups
- `go run test.go -find`
- performs an `admin` call to `Find` node, to the `n0`, asking about the `n3`
- calls to simulate kademlia protocol rpc calls
- `go run test.go -ping`
- performs the `PING` call
- `go run test.go -findnode`
- performs the `FIND_NODE` call
- `go run test.go -findvalue`
- performs the `FIND_VALUE` call
- `go run test.go -store`
- performs the `STORE` call

+ 9
- 0
cmd/cmd.go

@ -43,6 +43,15 @@ func cmdStart(c *cli.Context) error {
} }
log.Info("New node created with ID: ", n.ID()) log.Info("New node created with ID: ", n.ID())
} }
go func() {
admin := node.NewAdmin(n)
err := admin.Start()
if err != nil {
panic(err)
}
}()
err = n.Start() err = n.Start()
return err return err
} }

+ 1
- 0
config.test0.yaml

@ -1,4 +1,5 @@
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1 addr: 127.0.0.1
port: 5000 port: 5000
adminport: 6000
storage: "tmp" storage: "tmp"

+ 1
- 0
config.test1.yaml

@ -1,6 +1,7 @@
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d5" id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d5"
addr: 127.0.0.1 addr: 127.0.0.1
port: 5001 port: 5001
adminport: 6001
knownNodes: knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" - id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1 addr: 127.0.0.1

+ 1
- 0
config.test2.yaml

@ -1,6 +1,7 @@
id: "1ff734fb9897600ca54a9c55ace2d22a51afb610" id: "1ff734fb9897600ca54a9c55ace2d22a51afb610"
addr: 127.0.0.1 addr: 127.0.0.1
port: 5002 port: 5002
adminport: 6002
knownNodes: knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" - id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1 addr: 127.0.0.1

+ 9
- 0
config.test3.yaml

@ -0,0 +1,9 @@
id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b"
addr: 127.0.0.1
port: 5003
adminport: 6003
knownNodes:
- id: "1ff734fb9897600ca54a9c55ace2d22a51afb610"
addr: 127.0.0.1
port: 5002
storage: "tmp"

+ 1
- 0
config/config.go

@ -11,6 +11,7 @@ type Config struct {
ID string ID string
Addr string Addr string
Port string Port string
AdminPort string
KnownNodesStr []KnownNodeStr `mapstructure:"knownnodes"` KnownNodesStr []KnownNodeStr `mapstructure:"knownnodes"`
KnownNodes []kademlia.ListedNode `mapstructure:"-"` KnownNodes []kademlia.ListedNode `mapstructure:"-"`
Storage string Storage string

+ 80
- 2
kademlia/kademlia.go

@ -2,6 +2,7 @@ package kademlia
import ( import (
"errors" "errors"
"fmt"
"math/bits" "math/bits"
"net/rpc" "net/rpc"
"strconv" "strconv"
@ -50,8 +51,12 @@ func (kad Kademlia) String() string {
return r return r
} }
func (kad Kademlia) FindClosestKBucket(id ID) (int, error) {
func (kad Kademlia) GetClosestKBucket(id ID) (int, error) {
kb := kad.KBucket(id) kb := kad.KBucket(id)
if kb == 0 {
// is this node
return kb, nil
}
if len(kad.KBuckets[kb]) != 0 { if len(kad.KBuckets[kb]) != 0 {
return kb, nil return kb, nil
} }
@ -118,7 +123,7 @@ func (kad *Kademlia) Update(o ListedNode) {
} }
func (kad *Kademlia) PingOldNode(k int, o ListedNode) { func (kad *Kademlia) PingOldNode(k int, o ListedNode) {
// TODO when rpc layer is done
// TODO
// ping the n.KBuckets[k][0] (using goroutine) // ping the n.KBuckets[k][0] (using goroutine)
// if no response (timeout), delete it and add 'o' // if no response (timeout), delete it and add 'o'
// n.KBuckets[k][0] = o // n.KBuckets[k][0] = o
@ -177,3 +182,76 @@ func moveToBottom(kb []ListedNode, k int) []ListedNode {
kb = append(kb[:], e) kb = append(kb[:], e)
return kb return kb
} }
func removeFromListedNodes(lns []ListedNode, k int) []ListedNode {
lns = append(lns[:k], lns[k+1:]...)
return lns
}
func (kad Kademlia) CallFindNode(id ID, o ListedNode) ([]ListedNode, error) {
client, err := rpc.DialHTTP("tcp", o.Addr+":"+o.Port)
if err != nil {
return nil, err
}
var lns []ListedNode
err = client.Call("Node.FindNode", id, &lns)
if err != nil {
return nil, err
}
return lns, nil
}
func (kad Kademlia) NodeLookup(id ID) ([]ListedNode, error) {
log.Debug("[NodeLookup] get closest KBucket for ", id)
// find closest kbucket for this current node
k, err := kad.GetClosestKBucket(id)
if err != nil {
return []ListedNode{}, fmt.Errorf("No KBuckets")
}
log.Debug("[NodeLookup] closest KBucket", k)
var closest ListedNode
var newClosest ListedNode
closest = kad.N
var lns []ListedNode
lns = kad.KBuckets[k]
for !closest.ID.Equal(newClosest.ID) { // TODO
// call each ListedNode from the kbucket, asking for their list of closest kbucket
var newLns []ListedNode
closest = newClosest
for k, ln := range lns {
// TODO TMP for the moment is synchronous
fmt.Println("calling node", ln)
flns, err := kad.CallFindNode(id, ln)
if err != nil {
log.Debug(err)
}
removeFromListedNodes(lns, k)
// newLns = append(newLns, flns...)
for _, fln := range flns {
exist, _ := existsInListedNodes(newLns, fln)
if !exist {
fmt.Println("ln", fln.ID, " not exists")
newLns = append(newLns, fln)
}
}
}
lns = newLns
newClosest = updateClosest(id, lns, closest)
}
return lns, nil
}
func updateClosest(id ID, lns []ListedNode, closest ListedNode) ListedNode {
d := id.Distance(closest.ID)
closestZ := countZeroes(d[:])
for _, ln := range lns {
d := id.Distance(ln.ID)
z := countZeroes(d[:])
if z < closestZ {
closest = ln
closestZ = z
}
}
return closest
}

+ 3
- 3
kademlia/kademlia_test.go

@ -152,7 +152,7 @@ func TestFindClosestKBucket(t *testing.T) {
idB, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d5") idB, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d5")
assert.Nil(t, err) assert.Nil(t, err)
k, err := kademlia.FindClosestKBucket(idB)
k, err := kademlia.GetClosestKBucket(idB)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 2, k) assert.Equal(t, 2, k)
@ -164,7 +164,7 @@ func TestFindClosestKBucket(t *testing.T) {
assert.Equal(t, 3, k) assert.Equal(t, 3, k)
// while the real KBucket (as the 3 is empty), should be 2 // while the real KBucket (as the 3 is empty), should be 2
k, err = kademlia.FindClosestKBucket(idB)
k, err = kademlia.GetClosestKBucket(idB)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 2, k) assert.Equal(t, 2, k)
@ -176,7 +176,7 @@ func TestFindClosestKBucket(t *testing.T) {
assert.Equal(t, 5, k) assert.Equal(t, 5, k)
// while the real KBucket (as the 3 is empty), should be 2 // while the real KBucket (as the 3 is empty), should be 2
k, err = kademlia.FindClosestKBucket(idB)
k, err = kademlia.GetClosestKBucket(idB)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 7, k) assert.Equal(t, 7, k)
} }

+ 78
- 0
node/admin.go

@ -0,0 +1,78 @@
package node
import (
"go-dht/config"
"go-dht/kademlia"
"io/ioutil"
"net"
"net/http"
"net/rpc"
log "github.com/sirupsen/logrus"
)
type Admin struct {
node Node
disc map[kademlia.ID][]kademlia.ListedNode
}
func NewAdmin(node Node) Admin {
return Admin{
node: node,
}
}
func (a *Admin) Start() error {
// rpc server
err := rpc.Register(a)
if err != nil {
return err
}
//
oldMux := http.DefaultServeMux
mux := http.NewServeMux()
http.DefaultServeMux = mux
//
rpc.HandleHTTP()
//
http.DefaultServeMux = oldMux
//
listener, err := net.Listen("tcp", ":"+config.C.AdminPort)
if err != nil {
return err
}
err = http.Serve(listener, nil)
if err != nil {
return err
}
return nil
}
func (a *Admin) Find(id kademlia.ID, lns *[]kademlia.ListedNode) error {
log.Info("[admin-rpc] FIND ", id)
// check if id in current node
_, err := ioutil.ReadFile(config.C.Storage + "/" + id.String())
if err == nil {
*lns = []kademlia.ListedNode{
kademlia.ListedNode{
ID: a.node.ID(),
Addr: config.C.Addr,
Port: config.C.Port,
},
}
log.Info("[admin-rpc] FIND found")
return nil
}
log.Info("[admin-rpc] FIND not in local Node, starting NodeLookup")
rlns, err := a.node.Kademlia().NodeLookup(id)
if err != nil {
log.Debug("[admin-rpc/FIND] ERROR: ", err)
return err
}
*lns = rlns
return nil
}

+ 14
- 4
node/node.go

@ -44,6 +44,10 @@ func (n Node) ID() kademlia.ID {
return n.kademlia.N.ID return n.kademlia.N.ID
} }
func (n Node) Kademlia() kademlia.Kademlia {
return *n.kademlia
}
func (n *Node) Start() error { func (n *Node) Start() error {
// rpc server // rpc server
err := rpc.Register(n) err := rpc.Register(n)
@ -60,11 +64,16 @@ func (n *Node) Start() error {
// TMP in order to print the KBuckets of the node // TMP in order to print the KBuckets of the node
for { for {
fmt.Println(n.kademlia) fmt.Println(n.kademlia)
time.Sleep(5 * time.Second)
time.Sleep(8 * time.Second)
} }
}() }()
go n.pingKnownNodes(config.C.KnownNodes) go n.pingKnownNodes(config.C.KnownNodes)
go n.kademlia.Update(kademlia.ListedNode{
ID: n.ID(),
Addr: config.C.Addr,
Port: config.C.Port,
})
err = http.Serve(listener, nil) err = http.Serve(listener, nil)
if err != nil { if err != nil {
@ -126,11 +135,12 @@ func (n *Node) Store(data []byte, ack *bool) error {
return nil return nil
} }
func (n *Node) FindNode(ln kademlia.ListedNode, lns *[]kademlia.ListedNode) error {
func (n *Node) FindNode(id kademlia.ID, lns *[]kademlia.ListedNode) error {
log.Info("[rpc] FIND_NODE") log.Info("[rpc] FIND_NODE")
// k := n.kademlia.KBucket(ln.ID) // k := n.kademlia.KBucket(ln.ID)
k, err := n.kademlia.FindClosestKBucket(ln.ID)
k, err := n.kademlia.GetClosestKBucket(id)
if err != nil { if err != nil {
log.Debug("[rpc] FIND_NODE ERROR: ", err)
*lns = []kademlia.ListedNode{} *lns = []kademlia.ListedNode{}
return nil return nil
} }
@ -158,7 +168,7 @@ func (n *Node) FindValue(id kademlia.ID, resp *FindValueResp) error {
} }
// k := n.kademlia.KBucket(id) // k := n.kademlia.KBucket(id)
k, err := n.kademlia.FindClosestKBucket(id)
k, err := n.kademlia.GetClosestKBucket(id)
if err != nil { if err != nil {
*resp = FindValueResp{} *resp = FindValueResp{}
return nil return nil

+ 31
- 12
rpc-test/test.go

@ -14,10 +14,15 @@ import (
// Utility to test the Node RPC methods // Utility to test the Node RPC methods
func main() { func main() {
// public rpc
pingFlag := flag.Bool("ping", false, "test Ping") pingFlag := flag.Bool("ping", false, "test Ping")
findnodeFlag := flag.Bool("findnode", false, "test FindNode") findnodeFlag := flag.Bool("findnode", false, "test FindNode")
findvalueFlag := flag.Bool("findvalue", false, "test FindValue") findvalueFlag := flag.Bool("findvalue", false, "test FindValue")
storeFlag := flag.Bool("store", false, "test Store") storeFlag := flag.Bool("store", false, "test Store")
// admin rpc
findFlag := flag.Bool("find", false, "test Find")
flag.Parse() flag.Parse()
if *pingFlag { if *pingFlag {
@ -32,6 +37,9 @@ func main() {
if *storeFlag { if *storeFlag {
testStore() testStore()
} }
if *findFlag {
testFind()
}
} }
func testPing() { func testPing() {
@ -57,18 +65,13 @@ func testFindNode() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
ln := kademlia.ListedNode{
ID: id,
Addr: "",
Port: "",
}
client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000") client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000")
if err != nil { if err != nil {
log.Fatal("Connection error: ", err) log.Fatal("Connection error: ", err)
} }
var reply []kademlia.ListedNode var reply []kademlia.ListedNode
err = client.Call("Node.FindNode", ln, &reply)
err = client.Call("Node.FindNode", id, &reply)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -79,14 +82,9 @@ func testFindNode() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
ln = kademlia.ListedNode{
ID: id,
Addr: "",
Port: "",
}
var reply2 []kademlia.ListedNode var reply2 []kademlia.ListedNode
err = client.Call("Node.FindNode", ln, &reply2)
err = client.Call("Node.FindNode", id, &reply2)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -177,3 +175,24 @@ func prepareTestListedNodes() []kademlia.ListedNode {
} }
return lns return lns
} }
// through admin rpc
func testFind() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:6000")
if err != nil {
log.Fatal("Connection error: ", err)
}
idStr := "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b"
// idStr := "1ff734fb9897600ca54a9c55ace2d22a51afb610"
id, err := kademlia.IDFromString(idStr)
if err != nil {
panic(err)
}
var lns []kademlia.ListedNode
err = client.Call("Admin.Find", id, &lns)
if err != nil {
panic(err)
}
fmt.Println(lns)
}

+ 5
- 0
run-dev-nodes.sh

@ -1,13 +1,18 @@
#!/bin/sh
SESSION='go-dht' SESSION='go-dht'
tmux new-session -d -s $SESSION tmux new-session -d -s $SESSION
tmux split-window -d -t 0 -v tmux split-window -d -t 0 -v
tmux split-window -d -t 0 -h tmux split-window -d -t 0 -h
tmux split-window -d -t 2 -h
tmux send-keys -t 0 'go run main.go --config config.test0.yaml --debug start' enter tmux send-keys -t 0 'go run main.go --config config.test0.yaml --debug start' enter
sleep 2 sleep 2
tmux send-keys -t 1 'go run main.go --config config.test1.yaml --debug start' enter tmux send-keys -t 1 'go run main.go --config config.test1.yaml --debug start' enter
tmux send-keys -t 2 'go run main.go --config config.test2.yaml --debug start' enter tmux send-keys -t 2 'go run main.go --config config.test2.yaml --debug start' enter
sleep 1
tmux send-keys -t 3 'go run main.go --config config.test3.yaml --debug start' enter
tmux attach tmux attach

Loading…
Cancel
Save