Browse Source

add FindClosestKBucket, add rpc FindNode & FindValue & Store (wip)

master
arnaucube 5 years ago
parent
commit
d971d1503a
11 changed files with 318 additions and 44 deletions
  1. +1
    -0
      .gitignore
  2. +1
    -0
      config.test0.yaml
  3. +2
    -1
      config.test1.yaml
  4. +2
    -1
      config.test2.yaml
  5. +1
    -0
      config/config.go
  6. +8
    -3
      kademlia/id.go
  7. +6
    -2
      kademlia/id_test.go
  8. +36
    -5
      kademlia/kademlia.go
  9. +76
    -12
      kademlia/kademlia_test.go
  10. +53
    -2
      node/node.go
  11. +132
    -18
      rpc-test/test.go

+ 1
- 0
.gitignore

@ -1 +1,2 @@
tmp.go
tmp

+ 1
- 0
config.test0.yaml

@ -1,3 +1,4 @@
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000
storage: "tmp"

+ 2
- 1
config.test1.yaml

@ -1,7 +1,8 @@
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d0"
id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d5"
addr: 127.0.0.1
port: 5001
knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000
storage: "tmp"

+ 2
- 1
config.test2.yaml

@ -1,7 +1,8 @@
id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b"
id: "1ff734fb9897600ca54a9c55ace2d22a51afb610"
addr: 127.0.0.1
port: 5002
knownNodes:
- id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7"
addr: 127.0.0.1
port: 5000
storage: "tmp"

+ 1
- 0
config/config.go

@ -13,6 +13,7 @@ type Config struct {
Port string
KnownNodesStr []KnownNodeStr `mapstructure:"knownnodes"`
KnownNodes []kademlia.ListedNode `mapstructure:"-"`
Storage string
}
type KnownNodeStr struct {

+ 8
- 3
kademlia/id.go

@ -3,8 +3,8 @@ package kademlia
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
)
type ID [B]byte
@ -29,8 +29,6 @@ func (id ID) MarshalText() ([]byte, error) {
}
func (id *ID) UnmarshalText(data []byte) error {
fmt.Println("UNMARSHAL")
fmt.Println("d", string(data))
var err error
var idFromStr ID
idFromStr, err = IDFromString(string(data))
@ -72,3 +70,10 @@ func (idA ID) Distance(idB ID) ID {
}
return d
}
func HashData(d []byte) ID {
h := sha256.Sum256(d)
var r ID
copy(r[:], h[:20])
return r
}

+ 6
- 2
kademlia/id_test.go

@ -1,8 +1,8 @@
package kademlia
import (
"encoding/hex"
"encoding/json"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
@ -28,7 +28,6 @@ func TestIDMarshalers(t *testing.T) {
idStr, err := json.Marshal(id)
assert.Nil(t, err)
assert.Equal(t, "\"0fd85ddddf15aeec2d5d8b01b013dbca030a18d7\"", string(idStr))
fmt.Println("idStr", string(idStr))
var idParsed ID
err = json.Unmarshal(idStr, &idParsed)
@ -56,3 +55,8 @@ func TestIDDistance(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "cb55d68e04fa18e5c0131fd236ce80e8ecc1348c", idA.Distance(idB).String())
}
func TestHashData(t *testing.T) {
h := HashData([]byte("test data"))
assert.Equal(t, "916f0027a575074ce72a331777c3478d6513f786", hex.EncodeToString(h[:]))
}

+ 36
- 5
kademlia/kademlia.go

@ -1,6 +1,7 @@
package kademlia
import (
"errors"
"math/bits"
"net/rpc"
"strconv"
@ -23,7 +24,7 @@ type ListedNode struct {
type Kademlia struct {
// N is this node data
N ListedNode
KBuckets [B * 8][]ListedNode
KBuckets [B][]ListedNode
}
func NewKademliaTable(id ID, addr, port string) *Kademlia {
@ -49,6 +50,22 @@ func (kad Kademlia) String() string {
return r
}
func (kad Kademlia) FindClosestKBucket(id ID) (int, error) {
kb := kad.KBucket(id)
if len(kad.KBuckets[kb]) != 0 {
return kb, nil
}
for i := 0; kb-i > 0 || kb+i < 8; i++ {
if len(kad.KBuckets[kb+i]) != 0 {
return kb + i, nil
}
if len(kad.KBuckets[kb-i]) != 0 {
return kb - i, nil
}
}
return 0, errors.New("not found")
}
func (kad Kademlia) KBucket(o ID) int {
d := kad.N.ID.Distance(o)
return kBucketByDistance(d[:])
@ -56,9 +73,22 @@ func (kad Kademlia) KBucket(o ID) int {
}
func kBucketByDistance(b []byte) int {
z := countZeroes(b)
if z == 0 {
return 0
}
for i := 0; i < 8; i++ {
if z>>uint8(i) == 1 { // check until 1 instead of 0 to avoid one operation
return i
}
}
return 7
}
func countZeroes(b []byte) int {
for i := 0; i < B; i++ {
for a := b[i] ^ 0; a != 0; a &= a - 1 {
return (B-1-i)*8 + (7 - bits.TrailingZeros8(bits.Reverse8(uint8(a))))
return (B * 8) - (i * 8) - (8 - bits.TrailingZeros8(bits.Reverse8(uint8(a))))
}
}
return (B*8 - 1) - (B*8 - 1)
@ -69,7 +99,7 @@ func (kad *Kademlia) Update(o ListedNode) {
kb := kad.KBuckets[k]
if len(kb) >= KBucketSize {
// if n.KBuckets[k] is alrady full, perform ping of the first element
log.Info("node.KBuckets[k] already full, performing ping to node.KBuckets[0]")
log.Debug("node.KBuckets[k] already full, performing ping to node.KBuckets[0]")
kad.PingOldNode(k, o)
return
}
@ -78,12 +108,12 @@ func (kad *Kademlia) Update(o ListedNode) {
if exist {
// update position of o to the bottom
kad.KBuckets[k] = moveToBottom(kad.KBuckets[k], pos)
log.Info("ListedNode already exists, moved to bottom")
log.Debug("ListedNode (" + o.ID.String() + ") already exists, moved to bottom")
return
}
// not exists, add it to the kBucket
kad.KBuckets[k] = append(kad.KBuckets[k], o)
log.Info("ListedNode not exists, added to the bottom")
log.Debug("ListedNode (" + o.ID.String() + ") not exists, added to the bottom")
return
}
@ -109,6 +139,7 @@ func (kad *Kademlia) CallPing(o ListedNode) error {
if err != nil {
return err
}
// TODO use reply as PONG
return nil
}

+ 76
- 12
kademlia/kademlia_test.go

@ -4,37 +4,54 @@ import (
"fmt"
"testing"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
var debug = false
func init() {
log.SetLevel(log.DebugLevel)
}
func TestCountZeros(t *testing.T) {
zeroes := []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
assert.Equal(t, 0, kBucketByDistance(zeroes))
b := []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
assert.Equal(t, 20, len(b))
assert.Equal(t, 159, kBucketByDistance(b))
assert.Equal(t, 7, kBucketByDistance(b))
b[19] = 0x00
assert.Equal(t, 159, kBucketByDistance(b))
assert.Equal(t, 7, kBucketByDistance(b))
b[0] = 0x0f
assert.Equal(t, 159-4, kBucketByDistance(b))
assert.Equal(t, 7, kBucketByDistance(b))
b[0] = 0x0c
assert.Equal(t, 159-4, kBucketByDistance(b))
assert.Equal(t, 7, kBucketByDistance(b))
b[0] = 0x00
b[1] = 0x00
b[2] = 0x0f
assert.Equal(t, 159-20, kBucketByDistance(b))
assert.Equal(t, 7, kBucketByDistance(b))
b[2] = 0x07
assert.Equal(t, 159-21, kBucketByDistance(b))
assert.Equal(t, 7, kBucketByDistance(b))
b[2] = 0x03
assert.Equal(t, 159-22, kBucketByDistance(b))
assert.Equal(t, 7, kBucketByDistance(b))
b = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
b[19] = 0x01
assert.Equal(t, 2, kBucketByDistance(b))
b[19] = 0x05
assert.Equal(t, 2, kBucketByDistance(b))
b[19] = 0x10
assert.Equal(t, 1, kBucketByDistance(b))
b[18] = 0x10
assert.Equal(t, 3, kBucketByDistance(b))
}
func TestKBucket(t *testing.T) {
@ -49,7 +66,7 @@ func TestKBucket(t *testing.T) {
assert.Nil(t, err)
d = kademlia.KBucket(idB)
assert.Equal(t, 159, d)
assert.Equal(t, 7, d)
}
func prepareTestListedNodes() []ListedNode {
@ -111,8 +128,55 @@ func TestUpdate(t *testing.T) {
fmt.Println(kademlia)
}
assert.Equal(t, len(kademlia.KBuckets[0]), 1)
assert.Equal(t, len(kademlia.KBuckets[1]), 1)
assert.Equal(t, len(kademlia.KBuckets[158]), 4)
assert.Equal(t, len(kademlia.KBuckets[159]), 5)
assert.Equal(t, 2, len(kademlia.KBuckets[0]))
assert.Equal(t, 0, len(kademlia.KBuckets[1]))
assert.Equal(t, 2, len(kademlia.KBuckets[2]))
assert.Equal(t, 0, len(kademlia.KBuckets[3]))
assert.Equal(t, 14, len(kademlia.KBuckets[7]))
}
func TestFindClosestKBucket(t *testing.T) {
idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7")
assert.Nil(t, err)
kademlia := NewKademliaTable(idA, "127.0.0.1", "5000")
lns := prepareTestListedNodes()
for _, lnI := range lns {
kademlia.Update(lnI)
}
if debug {
fmt.Println(kademlia)
}
idB, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d5")
assert.Nil(t, err)
k, err := kademlia.FindClosestKBucket(idB)
assert.Nil(t, err)
assert.Equal(t, 2, k)
idB, err = IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a1000")
assert.Nil(t, err)
// the theorical KBucket should be 3
k = kademlia.KBucket(idB)
assert.Equal(t, 3, k)
// while the real KBucket (as the 3 is empty), should be 2
k, err = kademlia.FindClosestKBucket(idB)
assert.Nil(t, err)
assert.Equal(t, 2, k)
idB, err = IDFromString("0fd85ddddf15aeec2d5d8b01b013dbc000000000")
assert.Nil(t, err)
// the theorical KBucket should be 3
k = kademlia.KBucket(idB)
assert.Equal(t, 5, k)
// while the real KBucket (as the 3 is empty), should be 2
k, err = kademlia.FindClosestKBucket(idB)
assert.Nil(t, err)
assert.Equal(t, 7, k)
}

+ 53
- 2
node/node.go

@ -1,9 +1,11 @@
package node
import (
"encoding/hex"
"fmt"
"go-dht/config"
"go-dht/kademlia"
"io/ioutil"
"net"
"net/http"
"net/rpc"
@ -108,15 +110,64 @@ func (n *Node) Pong(ln kademlia.ListedNode, ack *bool) error {
func (n *Node) Store(data []byte, ack *bool) error {
log.Info("[rpc] STORE")
h := kademlia.HashData(data)
if n.kademlia.KBucket(h) != 0 {
*ack = false
log.Warning("[STORE] data not for this node")
return nil
}
err := ioutil.WriteFile(config.C.Storage+"/"+hex.EncodeToString(h[:]), data, 0644)
if err != nil {
*ack = false
log.Warning("[STORE]", err)
return err
}
*ack = true
return nil
}
func (n *Node) FindNode() {
func (n *Node) FindNode(ln kademlia.ListedNode, lns *[]kademlia.ListedNode) error {
log.Info("[rpc] FIND_NODE")
// k := n.kademlia.KBucket(ln.ID)
k, err := n.kademlia.FindClosestKBucket(ln.ID)
if err != nil {
*lns = []kademlia.ListedNode{}
return nil
}
log.Info("[FIND_NODE] k", k)
bucket := n.kademlia.KBuckets[k]
*lns = bucket
return nil
}
type FindValueResp struct {
Value []byte
Lns []kademlia.ListedNode
}
func (n *Node) FindValue() {
func (n *Node) FindValue(id kademlia.ID, resp *FindValueResp) error {
log.Info("[rpc] FIND_VALUE")
// first check if value is in this node storage
f, err := ioutil.ReadFile(config.C.Storage + "/" + id.String())
if err == nil {
// value exists, return it
*resp = FindValueResp{
Value: f,
}
return nil
}
// k := n.kademlia.KBucket(id)
k, err := n.kademlia.FindClosestKBucket(id)
if err != nil {
*resp = FindValueResp{}
return nil
}
log.Info("[FIND_VALUE] k", k)
// bucket := n.kademlia.KBuckets[k]
*resp = FindValueResp{
Lns: n.kademlia.KBuckets[k],
}
return nil
}

+ 132
- 18
rpc-test/test.go

@ -1,14 +1,146 @@
package main
import (
"errors"
"flag"
"fmt"
"go-dht/kademlia"
"go-dht/node"
"log"
"net/rpc"
"strconv"
)
// Utility to test the Node RPC methods
func main() {
pingFlag := flag.Bool("ping", false, "test Ping")
findnodeFlag := flag.Bool("findnode", false, "test FindNode")
findvalueFlag := flag.Bool("findvalue", false, "test FindValue")
storeFlag := flag.Bool("store", false, "test Store")
flag.Parse()
if *pingFlag {
testPing()
}
if *findnodeFlag {
testFindNode()
}
if *findvalueFlag {
testFindValue()
}
if *storeFlag {
testStore()
}
}
func testPing() {
lns := prepareTestListedNodes()
client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000")
if err != nil {
log.Fatal("Connection error: ", err)
}
var reply kademlia.ListedNode
for _, ln := range lns {
err = client.Call("Node.Ping", ln, &reply)
if err != nil {
panic(err)
}
fmt.Println(reply)
}
}
func testFindNode() {
// existing node
id, err := kademlia.IDFromString("1ff734fb9897600ca54a9c55ace2d22a51afb610")
if err != nil {
panic(err)
}
ln := kademlia.ListedNode{
ID: id,
Addr: "",
Port: "",
}
client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000")
if err != nil {
log.Fatal("Connection error: ", err)
}
var reply []kademlia.ListedNode
err = client.Call("Node.FindNode", ln, &reply)
if err != nil {
panic(err)
}
fmt.Println(reply)
// find non existing node, to get a closer one
id, err = kademlia.IDFromString("1ff734fb9897600ca54a9c55ace2d22a51aaaaaa")
if err != nil {
panic(err)
}
ln = kademlia.ListedNode{
ID: id,
Addr: "",
Port: "",
}
var reply2 []kademlia.ListedNode
err = client.Call("Node.FindNode", ln, &reply2)
if err != nil {
panic(err)
}
fmt.Println(reply2)
}
func testFindValue() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:5002")
if err != nil {
log.Fatal("Connection error: ", err)
}
// first store the value
data := []byte("test data0")
h := kademlia.HashData(data)
fmt.Println(h)
var reply bool
err = client.Call("Node.Store", data, &reply)
if err != nil {
panic(err)
}
fmt.Println(reply)
// now call FIND_VALUE
id, err := kademlia.IDFromString("1ff734fb9897600ca54a9c55ace2d22a51afb610")
if err != nil {
panic(err)
}
var reply2 node.FindValueResp
err = client.Call("Node.FindValue", id, &reply2)
if err != nil {
panic(err)
}
if len(reply2.Value) == 0 {
panic(errors.New("expected value response on FIND_VALUE"))
}
fmt.Println("FIND_VALUE response:", string(reply2.Value))
}
func testStore() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000")
if err != nil {
log.Fatal("Connection error: ", err)
}
var reply bool
for i := 0; i < 10; i++ {
err = client.Call("Node.Store", []byte("test data"+strconv.Itoa(i)), &reply)
if err != nil {
panic(err)
}
fmt.Println(reply)
}
}
func prepareTestListedNodes() []kademlia.ListedNode {
lnIDs := []string{
"c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b",
@ -45,21 +177,3 @@ func prepareTestListedNodes() []kademlia.ListedNode {
}
return lns
}
func main() {
lns := prepareTestListedNodes()
client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000")
if err != nil {
log.Fatal("Connection error: ", err)
}
var reply kademlia.ListedNode
for _, ln := range lns {
err = client.Call("Node.Ping", ln, &reply)
if err != nil {
panic(err)
}
fmt.Println(reply)
}
}

Loading…
Cancel
Save