diff --git a/README.md b/README.md index 1379f56..cd76004 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,17 @@ Kademlia DHT Go implementation. -Following the specification from https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf and http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html +Following the specification from +- https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf +- http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html + +## Run +To run a node: +``` +go run main.go --config config.test0.yaml --debug start +``` + +To run 3 test nodes inside a tmux session: +``` +bash run-dev-nodes.sh +``` diff --git a/cmd/cmd.go b/cmd/cmd.go index 9c72440..cd6d1d8 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -18,6 +18,11 @@ var NodeCommands = []cli.Command{ } func cmdStart(c *cli.Context) error { + if c.GlobalBool("debug") { + log.SetLevel(log.DebugLevel) + log.SetReportCaller(true) + } + if err := config.MustRead(c); err != nil { return err } diff --git a/config.test0.yaml b/config.test0.yaml new file mode 100644 index 0000000..f47db35 --- /dev/null +++ b/config.test0.yaml @@ -0,0 +1,3 @@ +id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" +addr: 127.0.0.1 +port: 5000 diff --git a/config.test1.yaml b/config.test1.yaml new file mode 100644 index 0000000..68b71b3 --- /dev/null +++ b/config.test1.yaml @@ -0,0 +1,7 @@ +id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d0" +addr: 127.0.0.1 +port: 5001 +knownNodes: +- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" + addr: 127.0.0.1 + port: 5000 diff --git a/config.test2.yaml b/config.test2.yaml new file mode 100644 index 0000000..d9ede15 --- /dev/null +++ b/config.test2.yaml @@ -0,0 +1,7 @@ +id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b" +addr: 127.0.0.1 +port: 5002 +knownNodes: +- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" + addr: 127.0.0.1 + port: 5000 diff --git a/config.yaml b/config.yaml deleted file mode 100644 index 6f8988c..0000000 --- a/config.yaml +++ /dev/null @@ -1,10 +0,0 @@ -id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" -addr: 127.0.0.1 -port: 5000 -knownNodes: -- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" - addr: 127.0.0.1 - port: 5001 -- id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b" - addr: 127.0.0.1 - port: 5002 diff --git a/kademlia/kademlia.go b/kademlia/kademlia.go index 252d202..4ed31c3 100644 --- a/kademlia/kademlia.go +++ b/kademlia/kademlia.go @@ -2,6 +2,7 @@ package kademlia import ( "math/bits" + "net/rpc" "strconv" log "github.com/sirupsen/logrus" @@ -20,18 +21,23 @@ type ListedNode struct { } type Kademlia struct { - ID ID + // N is this node data + N ListedNode KBuckets [B * 8][]ListedNode } -func NewKademliaTable(id ID) *Kademlia { +func NewKademliaTable(id ID, addr, port string) *Kademlia { return &Kademlia{ - ID: id, + N: ListedNode{ + ID: id, + Addr: addr, + Port: port, + }, } } func (kad Kademlia) String() string { - r := "Node ID: " + kad.ID.String() + ", KBuckets:\n" + r := "Node ID: " + kad.N.ID.String() + ", KBuckets:\n" for i, kb := range kad.KBuckets { if len(kb) > 0 { r += " KBucket " + strconv.Itoa(i) + "\n" @@ -44,7 +50,7 @@ func (kad Kademlia) String() string { } func (kad Kademlia) KBucket(o ID) int { - d := kad.ID.Distance(o) + d := kad.N.ID.Distance(o) return kBucketByDistance(d[:]) } @@ -63,8 +69,8 @@ func (kad *Kademlia) Update(o ListedNode) { kb := kad.KBuckets[k] if len(kb) >= KBucketSize { // if n.KBuckets[k] is alrady full, perform ping of the first element - log.Debug("node.KBuckets[k] already full, performing ping to node.KBuckets[0]") - kad.Ping(k, o) + log.Info("node.KBuckets[k] already full, performing ping to node.KBuckets[0]") + kad.PingOldNode(k, o) return } // check that is not already in the list @@ -72,22 +78,59 @@ func (kad *Kademlia) Update(o ListedNode) { if exist { // update position of o to the bottom kad.KBuckets[k] = moveToBottom(kad.KBuckets[k], pos) - log.Debug("ListedNode already exists, moved to bottom") + log.Info("ListedNode already exists, moved to bottom") return } // not exists, add it to the kBucket kad.KBuckets[k] = append(kad.KBuckets[k], o) - log.Debug("ListedNode not exists, added to the bottom") + log.Info("ListedNode not exists, added to the bottom") return } -func (kad *Kademlia) Ping(k int, o ListedNode) { +func (kad *Kademlia) PingOldNode(k int, o ListedNode) { // TODO when rpc layer is done // ping the n.KBuckets[k][0] (using goroutine) // if no response (timeout), delete it and add 'o' // n.KBuckets[k][0] = o } +func (kad *Kademlia) CallPing(o ListedNode) error { + client, err := rpc.DialHTTP("tcp", o.Addr+":"+o.Port) + if err != nil { + return err + } + ln := ListedNode{ + ID: kad.N.ID, + Addr: kad.N.Addr, + Port: kad.N.Port, + } + var reply ListedNode + err = client.Call("Node.Ping", ln, &reply) + if err != nil { + return err + } + return nil +} + +func (kad *Kademlia) CallPong(o ListedNode) error { + client, err := rpc.DialHTTP("tcp", o.Addr+":"+o.Port) + if err != nil { + return err + } + ln := ListedNode{ + ID: kad.N.ID, + Addr: kad.N.Addr, + Port: kad.N.Port, + } + var reply bool + err = client.Call("Node.Pong", ln, &reply) + if err != nil { + return err + } + + return nil +} + func existsInListedNodes(lns []ListedNode, ln ListedNode) (bool, int) { for i, v := range lns { if v.ID.Equal(ln.ID) { diff --git a/kademlia/kademlia_test.go b/kademlia/kademlia_test.go index 339bde6..6fac2c2 100644 --- a/kademlia/kademlia_test.go +++ b/kademlia/kademlia_test.go @@ -40,9 +40,9 @@ func TestCountZeros(t *testing.T) { func TestKBucket(t *testing.T) { idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7") assert.Nil(t, err) - kademlia := NewKademliaTable(idA) + kademlia := NewKademliaTable(idA, "127.0.0.1", "5000") - d := kademlia.KBucket(kademlia.ID) + d := kademlia.KBucket(kademlia.N.ID) assert.Equal(t, 0, d) // same node should have distance 0 idB, err := IDFromString("c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b") @@ -100,7 +100,7 @@ func TestMoveToBottom(t *testing.T) { func TestUpdate(t *testing.T) { idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7") assert.Nil(t, err) - kademlia := NewKademliaTable(idA) + kademlia := NewKademliaTable(idA, "127.0.0.1", "5000") lns := prepareTestListedNodes() for _, lnI := range lns { diff --git a/main.go b/main.go index e918aeb..0665bca 100644 --- a/main.go +++ b/main.go @@ -10,13 +10,13 @@ import ( ) func main() { - log.SetLevel(log.DebugLevel) app := cli.NewApp() app.Name = "go-dht" app.Version = "0.0.1-alpha" app.Flags = []cli.Flag{ cli.StringFlag{Name: "config"}, + cli.BoolFlag{Name: "debug"}, } app.Commands = []cli.Command{} diff --git a/node/node.go b/node/node.go index 1e6b932..6a55882 100644 --- a/node/node.go +++ b/node/node.go @@ -1,11 +1,13 @@ package node import ( + "fmt" "go-dht/config" "go-dht/kademlia" "net" "net/http" "net/rpc" + "time" log "github.com/sirupsen/logrus" ) @@ -22,7 +24,7 @@ func NewNode() (Node, error) { } var n Node - n.kademlia = kademlia.NewKademliaTable(id) + n.kademlia = kademlia.NewKademliaTable(id, config.C.Addr, config.C.Port) return n, nil } @@ -32,15 +34,16 @@ func LoadNode(idStr string) (Node, error) { return Node{}, err } var n Node - n.kademlia = kademlia.NewKademliaTable(id) + n.kademlia = kademlia.NewKademliaTable(id, config.C.Addr, config.C.Port) return n, nil } func (n Node) ID() kademlia.ID { - return n.kademlia.ID + return n.kademlia.N.ID } func (n *Node) Start() error { + // rpc server err := rpc.Register(n) if err != nil { return err @@ -50,11 +53,31 @@ func (n *Node) Start() error { if err != nil { return err } + + go func() { + // TMP in order to print the KBuckets of the node + for { + fmt.Println(n.kademlia) + time.Sleep(5 * time.Second) + } + }() + + go n.pingKnownNodes(config.C.KnownNodes) + err = http.Serve(listener, nil) if err != nil { return err } - // TODO ping config.C.KnownNodes + return nil +} + +func (n *Node) pingKnownNodes(lns []kademlia.ListedNode) error { + for _, ln := range lns { + err := n.kademlia.CallPing(ln) + if err != nil { + log.Warning("[pingKnownNodes]", err) + } + } return nil } @@ -68,7 +91,11 @@ func (n *Node) Ping(ln kademlia.ListedNode, thisLn *kademlia.ListedNode) error { Addr: config.C.Addr, Port: config.C.Port, } - // TODO perform PONG call to the requester (maybe ping&pong can be unified) + // perform PONG call to the requester (maybe ping&pong can be unified) + err := n.kademlia.CallPong(ln) + if err != nil { + log.Warning("[PONG]", err) + } return nil } diff --git a/run-dev-nodes.sh b/run-dev-nodes.sh new file mode 100644 index 0000000..f4b2d14 --- /dev/null +++ b/run-dev-nodes.sh @@ -0,0 +1,13 @@ +SESSION='go-dht' + +tmux new-session -d -s $SESSION +tmux split-window -d -t 0 -v +tmux split-window -d -t 0 -h + + +tmux send-keys -t 0 'go run main.go --config config.test0.yaml --debug start' enter +sleep 2 +tmux send-keys -t 1 'go run main.go --config config.test1.yaml --debug start' enter +tmux send-keys -t 2 'go run main.go --config config.test2.yaml --debug start' enter + +tmux attach