From ff0ba3995a7ec37ee2119c719313de9fdfd4f5d4 Mon Sep 17 00:00:00 2001 From: arnaucube Date: Sun, 15 Dec 2019 17:22:35 +0100 Subject: [PATCH] add admin RPC find call, add kademlia NodeLookup interactive --- README.md | 32 ++++++++++++++- cmd/cmd.go | 9 +++++ config.test0.yaml | 1 + config.test1.yaml | 1 + config.test2.yaml | 1 + config.test3.yaml | 9 +++++ config/config.go | 1 + kademlia/kademlia.go | 82 ++++++++++++++++++++++++++++++++++++++- kademlia/kademlia_test.go | 6 +-- node/admin.go | 78 +++++++++++++++++++++++++++++++++++++ node/node.go | 18 +++++++-- rpc-test/test.go | 43 ++++++++++++++------ run-dev-nodes.sh | 5 +++ 13 files changed, 264 insertions(+), 22 deletions(-) create mode 100644 config.test3.yaml create mode 100644 node/admin.go mode change 100644 => 100755 run-dev-nodes.sh diff --git a/README.md b/README.md index cd76004..9946743 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,37 @@ To run a node: go run main.go --config config.test0.yaml --debug start ``` -To run 3 test nodes inside a tmux session: + +## Test +- Scenario: +``` ++--+ +--+ +|n0+-----------+n1| ++-++ +--+ + | + | + | +--+ +--+ + +----+n2+-----------+n3| + +--+ +--+ +``` + +- To run 4 test nodes inside a tmux session: ``` bash run-dev-nodes.sh ``` + +Using the `test.go` in the `rpc-test` directory: + +- calls to the node to perform lookups + - `go run test.go -find` + - performs an `admin` call to `Find` node, to the `n0`, asking about the `n3` +- calls to simulate kademlia protocol rpc calls + - `go run test.go -ping` + - performs the `PING` call + - `go run test.go -findnode` + - performs the `FIND_NODE` call + - `go run test.go -findvalue` + - performs the `FIND_VALUE` call + - `go run test.go -store` + - performs the `STORE` call + diff --git a/cmd/cmd.go b/cmd/cmd.go index cd6d1d8..c3bdd03 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -43,6 +43,15 @@ func cmdStart(c *cli.Context) error { } log.Info("New node created with ID: ", n.ID()) } + + go func() { + admin := node.NewAdmin(n) + err := admin.Start() + if err != nil { + panic(err) + } + }() + err = n.Start() return err } diff --git a/config.test0.yaml b/config.test0.yaml index cf6956c..46b2e95 100644 --- a/config.test0.yaml +++ b/config.test0.yaml @@ -1,4 +1,5 @@ id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" addr: 127.0.0.1 port: 5000 +adminport: 6000 storage: "tmp" diff --git a/config.test1.yaml b/config.test1.yaml index ccf8d4a..03b3cee 100644 --- a/config.test1.yaml +++ b/config.test1.yaml @@ -1,6 +1,7 @@ id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d5" addr: 127.0.0.1 port: 5001 +adminport: 6001 knownNodes: - id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" addr: 127.0.0.1 diff --git a/config.test2.yaml b/config.test2.yaml index 9a48695..2c29033 100644 --- a/config.test2.yaml +++ b/config.test2.yaml @@ -1,6 +1,7 @@ id: "1ff734fb9897600ca54a9c55ace2d22a51afb610" addr: 127.0.0.1 port: 5002 +adminport: 6002 knownNodes: - id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" addr: 127.0.0.1 diff --git a/config.test3.yaml b/config.test3.yaml new file mode 100644 index 0000000..9481d9f --- /dev/null +++ b/config.test3.yaml @@ -0,0 +1,9 @@ +id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b" +addr: 127.0.0.1 +port: 5003 +adminport: 6003 +knownNodes: +- id: "1ff734fb9897600ca54a9c55ace2d22a51afb610" + addr: 127.0.0.1 + port: 5002 +storage: "tmp" diff --git a/config/config.go b/config/config.go index 0ea89f7..753f199 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,7 @@ type Config struct { ID string Addr string Port string + AdminPort string KnownNodesStr []KnownNodeStr `mapstructure:"knownnodes"` KnownNodes []kademlia.ListedNode `mapstructure:"-"` Storage string diff --git a/kademlia/kademlia.go b/kademlia/kademlia.go index 7491469..8e7ad3d 100644 --- a/kademlia/kademlia.go +++ b/kademlia/kademlia.go @@ -2,6 +2,7 @@ package kademlia import ( "errors" + "fmt" "math/bits" "net/rpc" "strconv" @@ -50,8 +51,12 @@ func (kad Kademlia) String() string { return r } -func (kad Kademlia) FindClosestKBucket(id ID) (int, error) { +func (kad Kademlia) GetClosestKBucket(id ID) (int, error) { kb := kad.KBucket(id) + if kb == 0 { + // is this node + return kb, nil + } if len(kad.KBuckets[kb]) != 0 { return kb, nil } @@ -118,7 +123,7 @@ func (kad *Kademlia) Update(o ListedNode) { } func (kad *Kademlia) PingOldNode(k int, o ListedNode) { - // TODO when rpc layer is done + // TODO // ping the n.KBuckets[k][0] (using goroutine) // if no response (timeout), delete it and add 'o' // n.KBuckets[k][0] = o @@ -177,3 +182,76 @@ func moveToBottom(kb []ListedNode, k int) []ListedNode { kb = append(kb[:], e) return kb } +func removeFromListedNodes(lns []ListedNode, k int) []ListedNode { + lns = append(lns[:k], lns[k+1:]...) + return lns +} + +func (kad Kademlia) CallFindNode(id ID, o ListedNode) ([]ListedNode, error) { + client, err := rpc.DialHTTP("tcp", o.Addr+":"+o.Port) + if err != nil { + return nil, err + } + var lns []ListedNode + err = client.Call("Node.FindNode", id, &lns) + if err != nil { + return nil, err + } + + return lns, nil +} + +func (kad Kademlia) NodeLookup(id ID) ([]ListedNode, error) { + log.Debug("[NodeLookup] get closest KBucket for ", id) + // find closest kbucket for this current node + k, err := kad.GetClosestKBucket(id) + if err != nil { + return []ListedNode{}, fmt.Errorf("No KBuckets") + } + log.Debug("[NodeLookup] closest KBucket", k) + + var closest ListedNode + var newClosest ListedNode + closest = kad.N + var lns []ListedNode + lns = kad.KBuckets[k] + for !closest.ID.Equal(newClosest.ID) { // TODO + // call each ListedNode from the kbucket, asking for their list of closest kbucket + var newLns []ListedNode + closest = newClosest + for k, ln := range lns { + // TODO TMP for the moment is synchronous + fmt.Println("calling node", ln) + flns, err := kad.CallFindNode(id, ln) + if err != nil { + log.Debug(err) + } + removeFromListedNodes(lns, k) + // newLns = append(newLns, flns...) + for _, fln := range flns { + exist, _ := existsInListedNodes(newLns, fln) + if !exist { + fmt.Println("ln", fln.ID, " not exists") + newLns = append(newLns, fln) + } + } + } + lns = newLns + newClosest = updateClosest(id, lns, closest) + } + return lns, nil +} + +func updateClosest(id ID, lns []ListedNode, closest ListedNode) ListedNode { + d := id.Distance(closest.ID) + closestZ := countZeroes(d[:]) + for _, ln := range lns { + d := id.Distance(ln.ID) + z := countZeroes(d[:]) + if z < closestZ { + closest = ln + closestZ = z + } + } + return closest +} diff --git a/kademlia/kademlia_test.go b/kademlia/kademlia_test.go index 8e5ed9d..6d8c9df 100644 --- a/kademlia/kademlia_test.go +++ b/kademlia/kademlia_test.go @@ -152,7 +152,7 @@ func TestFindClosestKBucket(t *testing.T) { idB, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d5") assert.Nil(t, err) - k, err := kademlia.FindClosestKBucket(idB) + k, err := kademlia.GetClosestKBucket(idB) assert.Nil(t, err) assert.Equal(t, 2, k) @@ -164,7 +164,7 @@ func TestFindClosestKBucket(t *testing.T) { assert.Equal(t, 3, k) // while the real KBucket (as the 3 is empty), should be 2 - k, err = kademlia.FindClosestKBucket(idB) + k, err = kademlia.GetClosestKBucket(idB) assert.Nil(t, err) assert.Equal(t, 2, k) @@ -176,7 +176,7 @@ func TestFindClosestKBucket(t *testing.T) { assert.Equal(t, 5, k) // while the real KBucket (as the 3 is empty), should be 2 - k, err = kademlia.FindClosestKBucket(idB) + k, err = kademlia.GetClosestKBucket(idB) assert.Nil(t, err) assert.Equal(t, 7, k) } diff --git a/node/admin.go b/node/admin.go new file mode 100644 index 0000000..9e8d4b7 --- /dev/null +++ b/node/admin.go @@ -0,0 +1,78 @@ +package node + +import ( + "go-dht/config" + "go-dht/kademlia" + "io/ioutil" + "net" + "net/http" + "net/rpc" + + log "github.com/sirupsen/logrus" +) + +type Admin struct { + node Node + disc map[kademlia.ID][]kademlia.ListedNode +} + +func NewAdmin(node Node) Admin { + return Admin{ + node: node, + } +} + +func (a *Admin) Start() error { + // rpc server + err := rpc.Register(a) + if err != nil { + return err + } + // + oldMux := http.DefaultServeMux + mux := http.NewServeMux() + http.DefaultServeMux = mux + // + rpc.HandleHTTP() + // + http.DefaultServeMux = oldMux + // + listener, err := net.Listen("tcp", ":"+config.C.AdminPort) + if err != nil { + return err + } + + err = http.Serve(listener, nil) + if err != nil { + return err + } + return nil +} + +func (a *Admin) Find(id kademlia.ID, lns *[]kademlia.ListedNode) error { + log.Info("[admin-rpc] FIND ", id) + + // check if id in current node + _, err := ioutil.ReadFile(config.C.Storage + "/" + id.String()) + if err == nil { + *lns = []kademlia.ListedNode{ + kademlia.ListedNode{ + ID: a.node.ID(), + Addr: config.C.Addr, + Port: config.C.Port, + }, + } + log.Info("[admin-rpc] FIND found") + return nil + } + log.Info("[admin-rpc] FIND not in local Node, starting NodeLookup") + + rlns, err := a.node.Kademlia().NodeLookup(id) + if err != nil { + log.Debug("[admin-rpc/FIND] ERROR: ", err) + return err + } + *lns = rlns + + return nil +} diff --git a/node/node.go b/node/node.go index b359fb7..162256c 100644 --- a/node/node.go +++ b/node/node.go @@ -44,6 +44,10 @@ func (n Node) ID() kademlia.ID { return n.kademlia.N.ID } +func (n Node) Kademlia() kademlia.Kademlia { + return *n.kademlia +} + func (n *Node) Start() error { // rpc server err := rpc.Register(n) @@ -60,11 +64,16 @@ func (n *Node) Start() error { // TMP in order to print the KBuckets of the node for { fmt.Println(n.kademlia) - time.Sleep(5 * time.Second) + time.Sleep(8 * time.Second) } }() go n.pingKnownNodes(config.C.KnownNodes) + go n.kademlia.Update(kademlia.ListedNode{ + ID: n.ID(), + Addr: config.C.Addr, + Port: config.C.Port, + }) err = http.Serve(listener, nil) if err != nil { @@ -126,11 +135,12 @@ func (n *Node) Store(data []byte, ack *bool) error { return nil } -func (n *Node) FindNode(ln kademlia.ListedNode, lns *[]kademlia.ListedNode) error { +func (n *Node) FindNode(id kademlia.ID, lns *[]kademlia.ListedNode) error { log.Info("[rpc] FIND_NODE") // k := n.kademlia.KBucket(ln.ID) - k, err := n.kademlia.FindClosestKBucket(ln.ID) + k, err := n.kademlia.GetClosestKBucket(id) if err != nil { + log.Debug("[rpc] FIND_NODE ERROR: ", err) *lns = []kademlia.ListedNode{} return nil } @@ -158,7 +168,7 @@ func (n *Node) FindValue(id kademlia.ID, resp *FindValueResp) error { } // k := n.kademlia.KBucket(id) - k, err := n.kademlia.FindClosestKBucket(id) + k, err := n.kademlia.GetClosestKBucket(id) if err != nil { *resp = FindValueResp{} return nil diff --git a/rpc-test/test.go b/rpc-test/test.go index 2d1211b..d9234a7 100644 --- a/rpc-test/test.go +++ b/rpc-test/test.go @@ -14,10 +14,15 @@ import ( // Utility to test the Node RPC methods func main() { + // public rpc pingFlag := flag.Bool("ping", false, "test Ping") findnodeFlag := flag.Bool("findnode", false, "test FindNode") findvalueFlag := flag.Bool("findvalue", false, "test FindValue") storeFlag := flag.Bool("store", false, "test Store") + + // admin rpc + findFlag := flag.Bool("find", false, "test Find") + flag.Parse() if *pingFlag { @@ -32,6 +37,9 @@ func main() { if *storeFlag { testStore() } + if *findFlag { + testFind() + } } func testPing() { @@ -57,18 +65,13 @@ func testFindNode() { if err != nil { panic(err) } - ln := kademlia.ListedNode{ - ID: id, - Addr: "", - Port: "", - } client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000") if err != nil { log.Fatal("Connection error: ", err) } var reply []kademlia.ListedNode - err = client.Call("Node.FindNode", ln, &reply) + err = client.Call("Node.FindNode", id, &reply) if err != nil { panic(err) } @@ -79,14 +82,9 @@ func testFindNode() { if err != nil { panic(err) } - ln = kademlia.ListedNode{ - ID: id, - Addr: "", - Port: "", - } var reply2 []kademlia.ListedNode - err = client.Call("Node.FindNode", ln, &reply2) + err = client.Call("Node.FindNode", id, &reply2) if err != nil { panic(err) } @@ -177,3 +175,24 @@ func prepareTestListedNodes() []kademlia.ListedNode { } return lns } + +// through admin rpc +func testFind() { + client, err := rpc.DialHTTP("tcp", "127.0.0.1:6000") + if err != nil { + log.Fatal("Connection error: ", err) + } + + idStr := "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b" + // idStr := "1ff734fb9897600ca54a9c55ace2d22a51afb610" + id, err := kademlia.IDFromString(idStr) + if err != nil { + panic(err) + } + var lns []kademlia.ListedNode + err = client.Call("Admin.Find", id, &lns) + if err != nil { + panic(err) + } + fmt.Println(lns) +} diff --git a/run-dev-nodes.sh b/run-dev-nodes.sh old mode 100644 new mode 100755 index f4b2d14..c43441d --- a/run-dev-nodes.sh +++ b/run-dev-nodes.sh @@ -1,13 +1,18 @@ +#!/bin/sh + SESSION='go-dht' tmux new-session -d -s $SESSION tmux split-window -d -t 0 -v tmux split-window -d -t 0 -h +tmux split-window -d -t 2 -h tmux send-keys -t 0 'go run main.go --config config.test0.yaml --debug start' enter sleep 2 tmux send-keys -t 1 'go run main.go --config config.test1.yaml --debug start' enter tmux send-keys -t 2 'go run main.go --config config.test2.yaml --debug start' enter +sleep 1 +tmux send-keys -t 3 'go run main.go --config config.test3.yaml --debug start' enter tmux attach