Browse Source

node rpc PING & PONG methods and calls

master
arnaucube 5 years ago
parent
commit
229a9003bb
11 changed files with 138 additions and 30 deletions
  1. +14
    -1
      README.md
  2. +5
    -0
      cmd/cmd.go
  3. +3
    -0
      config.test0.yaml
  4. +7
    -0
      config.test1.yaml
  5. +7
    -0
      config.test2.yaml
  6. +0
    -10
      config.yaml
  7. +53
    -10
      kademlia/kademlia.go
  8. +3
    -3
      kademlia/kademlia_test.go
  9. +1
    -1
      main.go
  10. +32
    -5
      node/node.go
  11. +13
    -0
      run-dev-nodes.sh

+ 14
- 1
README.md

@ -2,4 +2,17 @@
Kademlia DHT Go implementation.
Following the specification from https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf and http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html
Following the specification from
- https://pdos.csail.mit.edu/~petar/papers/maymounkov-kademlia-lncs.pdf
- http://xlattice.sourceforge.net/components/protocol/kademlia/specs.html
## Run
To run a node:
```
go run main.go --config config.test0.yaml --debug start
```
To run 3 test nodes inside a tmux session:
```
bash run-dev-nodes.sh
```

+ 5
- 0
cmd/cmd.go

@ -18,6 +18,11 @@ var NodeCommands = []cli.Command{
}
func cmdStart(c *cli.Context) error {
if c.GlobalBool("debug") {
log.SetLevel(log.DebugLevel)
log.SetReportCaller(true)
}
if err := config.MustRead(c); err != nil {
return err
}

+ 3
- 0
config.test0.yaml

@ -0,0 +1,3 @@
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000

+ 7
- 0
config.test1.yaml

@ -0,0 +1,7 @@
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d0"
addr: 127.0.0.1
port: 5001
knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000

+ 7
- 0
config.test2.yaml

@ -0,0 +1,7 @@
id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b"
addr: 127.0.0.1
port: 5002
knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000

+ 0
- 10
config.yaml

@ -1,10 +0,0 @@
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000
knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5001
- id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b"
addr: 127.0.0.1
port: 5002

+ 53
- 10
kademlia/kademlia.go

@ -2,6 +2,7 @@ package kademlia
import (
"math/bits"
"net/rpc"
"strconv"
log "github.com/sirupsen/logrus"
@ -20,18 +21,23 @@ type ListedNode struct {
}
type Kademlia struct {
ID ID
// N is this node data
N ListedNode
KBuckets [B * 8][]ListedNode
}
func NewKademliaTable(id ID) *Kademlia {
func NewKademliaTable(id ID, addr, port string) *Kademlia {
return &Kademlia{
ID: id,
N: ListedNode{
ID: id,
Addr: addr,
Port: port,
},
}
}
func (kad Kademlia) String() string {
r := "Node ID: " + kad.ID.String() + ", KBuckets:\n"
r := "Node ID: " + kad.N.ID.String() + ", KBuckets:\n"
for i, kb := range kad.KBuckets {
if len(kb) > 0 {
r += " KBucket " + strconv.Itoa(i) + "\n"
@ -44,7 +50,7 @@ func (kad Kademlia) String() string {
}
func (kad Kademlia) KBucket(o ID) int {
d := kad.ID.Distance(o)
d := kad.N.ID.Distance(o)
return kBucketByDistance(d[:])
}
@ -63,8 +69,8 @@ func (kad *Kademlia) Update(o ListedNode) {
kb := kad.KBuckets[k]
if len(kb) >= KBucketSize {
// if n.KBuckets[k] is alrady full, perform ping of the first element
log.Debug("node.KBuckets[k] already full, performing ping to node.KBuckets[0]")
kad.Ping(k, o)
log.Info("node.KBuckets[k] already full, performing ping to node.KBuckets[0]")
kad.PingOldNode(k, o)
return
}
// check that is not already in the list
@ -72,22 +78,59 @@ func (kad *Kademlia) Update(o ListedNode) {
if exist {
// update position of o to the bottom
kad.KBuckets[k] = moveToBottom(kad.KBuckets[k], pos)
log.Debug("ListedNode already exists, moved to bottom")
log.Info("ListedNode already exists, moved to bottom")
return
}
// not exists, add it to the kBucket
kad.KBuckets[k] = append(kad.KBuckets[k], o)
log.Debug("ListedNode not exists, added to the bottom")
log.Info("ListedNode not exists, added to the bottom")
return
}
func (kad *Kademlia) Ping(k int, o ListedNode) {
func (kad *Kademlia) PingOldNode(k int, o ListedNode) {
// TODO when rpc layer is done
// ping the n.KBuckets[k][0] (using goroutine)
// if no response (timeout), delete it and add 'o'
// n.KBuckets[k][0] = o
}
func (kad *Kademlia) CallPing(o ListedNode) error {
client, err := rpc.DialHTTP("tcp", o.Addr+":"+o.Port)
if err != nil {
return err
}
ln := ListedNode{
ID: kad.N.ID,
Addr: kad.N.Addr,
Port: kad.N.Port,
}
var reply ListedNode
err = client.Call("Node.Ping", ln, &reply)
if err != nil {
return err
}
return nil
}
func (kad *Kademlia) CallPong(o ListedNode) error {
client, err := rpc.DialHTTP("tcp", o.Addr+":"+o.Port)
if err != nil {
return err
}
ln := ListedNode{
ID: kad.N.ID,
Addr: kad.N.Addr,
Port: kad.N.Port,
}
var reply bool
err = client.Call("Node.Pong", ln, &reply)
if err != nil {
return err
}
return nil
}
func existsInListedNodes(lns []ListedNode, ln ListedNode) (bool, int) {
for i, v := range lns {
if v.ID.Equal(ln.ID) {

+ 3
- 3
kademlia/kademlia_test.go

@ -40,9 +40,9 @@ func TestCountZeros(t *testing.T) {
func TestKBucket(t *testing.T) {
idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7")
assert.Nil(t, err)
kademlia := NewKademliaTable(idA)
kademlia := NewKademliaTable(idA, "127.0.0.1", "5000")
d := kademlia.KBucket(kademlia.ID)
d := kademlia.KBucket(kademlia.N.ID)
assert.Equal(t, 0, d) // same node should have distance 0
idB, err := IDFromString("c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b")
@ -100,7 +100,7 @@ func TestMoveToBottom(t *testing.T) {
func TestUpdate(t *testing.T) {
idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7")
assert.Nil(t, err)
kademlia := NewKademliaTable(idA)
kademlia := NewKademliaTable(idA, "127.0.0.1", "5000")
lns := prepareTestListedNodes()
for _, lnI := range lns {

+ 1
- 1
main.go

@ -10,13 +10,13 @@ import (
)
func main() {
log.SetLevel(log.DebugLevel)
app := cli.NewApp()
app.Name = "go-dht"
app.Version = "0.0.1-alpha"
app.Flags = []cli.Flag{
cli.StringFlag{Name: "config"},
cli.BoolFlag{Name: "debug"},
}
app.Commands = []cli.Command{}

+ 32
- 5
node/node.go

@ -1,11 +1,13 @@
package node
import (
"fmt"
"go-dht/config"
"go-dht/kademlia"
"net"
"net/http"
"net/rpc"
"time"
log "github.com/sirupsen/logrus"
)
@ -22,7 +24,7 @@ func NewNode() (Node, error) {
}
var n Node
n.kademlia = kademlia.NewKademliaTable(id)
n.kademlia = kademlia.NewKademliaTable(id, config.C.Addr, config.C.Port)
return n, nil
}
@ -32,15 +34,16 @@ func LoadNode(idStr string) (Node, error) {
return Node{}, err
}
var n Node
n.kademlia = kademlia.NewKademliaTable(id)
n.kademlia = kademlia.NewKademliaTable(id, config.C.Addr, config.C.Port)
return n, nil
}
func (n Node) ID() kademlia.ID {
return n.kademlia.ID
return n.kademlia.N.ID
}
func (n *Node) Start() error {
// rpc server
err := rpc.Register(n)
if err != nil {
return err
@ -50,11 +53,31 @@ func (n *Node) Start() error {
if err != nil {
return err
}
go func() {
// TMP in order to print the KBuckets of the node
for {
fmt.Println(n.kademlia)
time.Sleep(5 * time.Second)
}
}()
go n.pingKnownNodes(config.C.KnownNodes)
err = http.Serve(listener, nil)
if err != nil {
return err
}
// TODO ping config.C.KnownNodes
return nil
}
func (n *Node) pingKnownNodes(lns []kademlia.ListedNode) error {
for _, ln := range lns {
err := n.kademlia.CallPing(ln)
if err != nil {
log.Warning("[pingKnownNodes]", err)
}
}
return nil
}
@ -68,7 +91,11 @@ func (n *Node) Ping(ln kademlia.ListedNode, thisLn *kademlia.ListedNode) error {
Addr: config.C.Addr,
Port: config.C.Port,
}
// TODO perform PONG call to the requester (maybe ping&pong can be unified)
// perform PONG call to the requester (maybe ping&pong can be unified)
err := n.kademlia.CallPong(ln)
if err != nil {
log.Warning("[PONG]", err)
}
return nil
}

+ 13
- 0
run-dev-nodes.sh

@ -0,0 +1,13 @@
SESSION='go-dht'
tmux new-session -d -s $SESSION
tmux split-window -d -t 0 -v
tmux split-window -d -t 0 -h
tmux send-keys -t 0 'go run main.go --config config.test0.yaml --debug start' enter
sleep 2
tmux send-keys -t 1 'go run main.go --config config.test1.yaml --debug start' enter
tmux send-keys -t 2 'go run main.go --config config.test2.yaml --debug start' enter
tmux attach

Loading…
Cancel
Save