diff --git a/.gitignore b/.gitignore index 2c53fa7..3c71db7 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ tmp.go +tmp diff --git a/config.test0.yaml b/config.test0.yaml index f47db35..cf6956c 100644 --- a/config.test0.yaml +++ b/config.test0.yaml @@ -1,3 +1,4 @@ id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" addr: 127.0.0.1 port: 5000 +storage: "tmp" diff --git a/config.test1.yaml b/config.test1.yaml index 68b71b3..ccf8d4a 100644 --- a/config.test1.yaml +++ b/config.test1.yaml @@ -1,7 +1,8 @@ -id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d0" +id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d5" addr: 127.0.0.1 port: 5001 knownNodes: - id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" addr: 127.0.0.1 port: 5000 +storage: "tmp" diff --git a/config.test2.yaml b/config.test2.yaml index d9ede15..9a48695 100644 --- a/config.test2.yaml +++ b/config.test2.yaml @@ -1,7 +1,8 @@ -id: "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b" +id: "1ff734fb9897600ca54a9c55ace2d22a51afb610" addr: 127.0.0.1 port: 5002 knownNodes: - id: "0fd85ddddf15aeec2d5d8b01b013dbca030a18d7" addr: 127.0.0.1 port: 5000 +storage: "tmp" diff --git a/config/config.go b/config/config.go index e519c89..0ea89f7 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ type Config struct { Port string KnownNodesStr []KnownNodeStr `mapstructure:"knownnodes"` KnownNodes []kademlia.ListedNode `mapstructure:"-"` + Storage string } type KnownNodeStr struct { diff --git a/kademlia/id.go b/kademlia/id.go index 6ad4767..a82e6c5 100644 --- a/kademlia/id.go +++ b/kademlia/id.go @@ -3,8 +3,8 @@ package kademlia import ( "bytes" "crypto/rand" + "crypto/sha256" "encoding/hex" - "fmt" ) type ID [B]byte @@ -29,8 +29,6 @@ func (id ID) MarshalText() ([]byte, error) { } func (id *ID) UnmarshalText(data []byte) error { - fmt.Println("UNMARSHAL") - fmt.Println("d", string(data)) var err error var idFromStr ID idFromStr, err = IDFromString(string(data)) @@ -72,3 +70,10 @@ func (idA ID) Distance(idB ID) ID { } return d } + +func HashData(d []byte) ID { + h := sha256.Sum256(d) + var r ID + copy(r[:], h[:20]) + return r +} diff --git a/kademlia/id_test.go b/kademlia/id_test.go index 9ee8d9a..3626f22 100644 --- a/kademlia/id_test.go +++ b/kademlia/id_test.go @@ -1,8 +1,8 @@ package kademlia import ( + "encoding/hex" "encoding/json" - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -28,7 +28,6 @@ func TestIDMarshalers(t *testing.T) { idStr, err := json.Marshal(id) assert.Nil(t, err) assert.Equal(t, "\"0fd85ddddf15aeec2d5d8b01b013dbca030a18d7\"", string(idStr)) - fmt.Println("idStr", string(idStr)) var idParsed ID err = json.Unmarshal(idStr, &idParsed) @@ -56,3 +55,8 @@ func TestIDDistance(t *testing.T) { assert.Nil(t, err) assert.Equal(t, "cb55d68e04fa18e5c0131fd236ce80e8ecc1348c", idA.Distance(idB).String()) } + +func TestHashData(t *testing.T) { + h := HashData([]byte("test data")) + assert.Equal(t, "916f0027a575074ce72a331777c3478d6513f786", hex.EncodeToString(h[:])) +} diff --git a/kademlia/kademlia.go b/kademlia/kademlia.go index 4ed31c3..7491469 100644 --- a/kademlia/kademlia.go +++ b/kademlia/kademlia.go @@ -1,6 +1,7 @@ package kademlia import ( + "errors" "math/bits" "net/rpc" "strconv" @@ -23,7 +24,7 @@ type ListedNode struct { type Kademlia struct { // N is this node data N ListedNode - KBuckets [B * 8][]ListedNode + KBuckets [B][]ListedNode } func NewKademliaTable(id ID, addr, port string) *Kademlia { @@ -49,6 +50,22 @@ func (kad Kademlia) String() string { return r } +func (kad Kademlia) FindClosestKBucket(id ID) (int, error) { + kb := kad.KBucket(id) + if len(kad.KBuckets[kb]) != 0 { + return kb, nil + } + for i := 0; kb-i > 0 || kb+i < 8; i++ { + if len(kad.KBuckets[kb+i]) != 0 { + return kb + i, nil + } + if len(kad.KBuckets[kb-i]) != 0 { + return kb - i, nil + } + } + return 0, errors.New("not found") +} + func (kad Kademlia) KBucket(o ID) int { d := kad.N.ID.Distance(o) return kBucketByDistance(d[:]) @@ -56,9 +73,22 @@ func (kad Kademlia) KBucket(o ID) int { } func kBucketByDistance(b []byte) int { + z := countZeroes(b) + if z == 0 { + return 0 + } + for i := 0; i < 8; i++ { + if z>>uint8(i) == 1 { // check until 1 instead of 0 to avoid one operation + return i + } + } + return 7 +} + +func countZeroes(b []byte) int { for i := 0; i < B; i++ { for a := b[i] ^ 0; a != 0; a &= a - 1 { - return (B-1-i)*8 + (7 - bits.TrailingZeros8(bits.Reverse8(uint8(a)))) + return (B * 8) - (i * 8) - (8 - bits.TrailingZeros8(bits.Reverse8(uint8(a)))) } } return (B*8 - 1) - (B*8 - 1) @@ -69,7 +99,7 @@ func (kad *Kademlia) Update(o ListedNode) { kb := kad.KBuckets[k] if len(kb) >= KBucketSize { // if n.KBuckets[k] is alrady full, perform ping of the first element - log.Info("node.KBuckets[k] already full, performing ping to node.KBuckets[0]") + log.Debug("node.KBuckets[k] already full, performing ping to node.KBuckets[0]") kad.PingOldNode(k, o) return } @@ -78,12 +108,12 @@ func (kad *Kademlia) Update(o ListedNode) { if exist { // update position of o to the bottom kad.KBuckets[k] = moveToBottom(kad.KBuckets[k], pos) - log.Info("ListedNode already exists, moved to bottom") + log.Debug("ListedNode (" + o.ID.String() + ") already exists, moved to bottom") return } // not exists, add it to the kBucket kad.KBuckets[k] = append(kad.KBuckets[k], o) - log.Info("ListedNode not exists, added to the bottom") + log.Debug("ListedNode (" + o.ID.String() + ") not exists, added to the bottom") return } @@ -109,6 +139,7 @@ func (kad *Kademlia) CallPing(o ListedNode) error { if err != nil { return err } + // TODO use reply as PONG return nil } diff --git a/kademlia/kademlia_test.go b/kademlia/kademlia_test.go index 6fac2c2..8e5ed9d 100644 --- a/kademlia/kademlia_test.go +++ b/kademlia/kademlia_test.go @@ -4,37 +4,54 @@ import ( "fmt" "testing" + log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) var debug = false +func init() { + log.SetLevel(log.DebugLevel) +} + func TestCountZeros(t *testing.T) { zeroes := []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} assert.Equal(t, 0, kBucketByDistance(zeroes)) b := []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} assert.Equal(t, 20, len(b)) - assert.Equal(t, 159, kBucketByDistance(b)) + assert.Equal(t, 7, kBucketByDistance(b)) b[19] = 0x00 - assert.Equal(t, 159, kBucketByDistance(b)) + assert.Equal(t, 7, kBucketByDistance(b)) b[0] = 0x0f - assert.Equal(t, 159-4, kBucketByDistance(b)) + assert.Equal(t, 7, kBucketByDistance(b)) b[0] = 0x0c - assert.Equal(t, 159-4, kBucketByDistance(b)) + assert.Equal(t, 7, kBucketByDistance(b)) b[0] = 0x00 b[1] = 0x00 b[2] = 0x0f - assert.Equal(t, 159-20, kBucketByDistance(b)) + assert.Equal(t, 7, kBucketByDistance(b)) b[2] = 0x07 - assert.Equal(t, 159-21, kBucketByDistance(b)) + assert.Equal(t, 7, kBucketByDistance(b)) b[2] = 0x03 - assert.Equal(t, 159-22, kBucketByDistance(b)) + assert.Equal(t, 7, kBucketByDistance(b)) + + b = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + b[19] = 0x01 + assert.Equal(t, 2, kBucketByDistance(b)) + b[19] = 0x05 + assert.Equal(t, 2, kBucketByDistance(b)) + + b[19] = 0x10 + assert.Equal(t, 1, kBucketByDistance(b)) + + b[18] = 0x10 + assert.Equal(t, 3, kBucketByDistance(b)) } func TestKBucket(t *testing.T) { @@ -49,7 +66,7 @@ func TestKBucket(t *testing.T) { assert.Nil(t, err) d = kademlia.KBucket(idB) - assert.Equal(t, 159, d) + assert.Equal(t, 7, d) } func prepareTestListedNodes() []ListedNode { @@ -111,8 +128,55 @@ func TestUpdate(t *testing.T) { fmt.Println(kademlia) } - assert.Equal(t, len(kademlia.KBuckets[0]), 1) - assert.Equal(t, len(kademlia.KBuckets[1]), 1) - assert.Equal(t, len(kademlia.KBuckets[158]), 4) - assert.Equal(t, len(kademlia.KBuckets[159]), 5) + assert.Equal(t, 2, len(kademlia.KBuckets[0])) + assert.Equal(t, 0, len(kademlia.KBuckets[1])) + assert.Equal(t, 2, len(kademlia.KBuckets[2])) + assert.Equal(t, 0, len(kademlia.KBuckets[3])) + assert.Equal(t, 14, len(kademlia.KBuckets[7])) +} + +func TestFindClosestKBucket(t *testing.T) { + idA, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d7") + assert.Nil(t, err) + kademlia := NewKademliaTable(idA, "127.0.0.1", "5000") + + lns := prepareTestListedNodes() + for _, lnI := range lns { + kademlia.Update(lnI) + } + + if debug { + fmt.Println(kademlia) + } + + idB, err := IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a18d5") + assert.Nil(t, err) + + k, err := kademlia.FindClosestKBucket(idB) + assert.Nil(t, err) + assert.Equal(t, 2, k) + + idB, err = IDFromString("0fd85ddddf15aeec2d5d8b01b013dbca030a1000") + assert.Nil(t, err) + + // the theorical KBucket should be 3 + k = kademlia.KBucket(idB) + assert.Equal(t, 3, k) + + // while the real KBucket (as the 3 is empty), should be 2 + k, err = kademlia.FindClosestKBucket(idB) + assert.Nil(t, err) + assert.Equal(t, 2, k) + + idB, err = IDFromString("0fd85ddddf15aeec2d5d8b01b013dbc000000000") + assert.Nil(t, err) + + // the theorical KBucket should be 3 + k = kademlia.KBucket(idB) + assert.Equal(t, 5, k) + + // while the real KBucket (as the 3 is empty), should be 2 + k, err = kademlia.FindClosestKBucket(idB) + assert.Nil(t, err) + assert.Equal(t, 7, k) } diff --git a/node/node.go b/node/node.go index 6a55882..b359fb7 100644 --- a/node/node.go +++ b/node/node.go @@ -1,9 +1,11 @@ package node import ( + "encoding/hex" "fmt" "go-dht/config" "go-dht/kademlia" + "io/ioutil" "net" "net/http" "net/rpc" @@ -108,15 +110,64 @@ func (n *Node) Pong(ln kademlia.ListedNode, ack *bool) error { func (n *Node) Store(data []byte, ack *bool) error { log.Info("[rpc] STORE") + h := kademlia.HashData(data) + if n.kademlia.KBucket(h) != 0 { + *ack = false + log.Warning("[STORE] data not for this node") + return nil + } + err := ioutil.WriteFile(config.C.Storage+"/"+hex.EncodeToString(h[:]), data, 0644) + if err != nil { + *ack = false + log.Warning("[STORE]", err) + return err + } + *ack = true return nil } -func (n *Node) FindNode() { +func (n *Node) FindNode(ln kademlia.ListedNode, lns *[]kademlia.ListedNode) error { log.Info("[rpc] FIND_NODE") + // k := n.kademlia.KBucket(ln.ID) + k, err := n.kademlia.FindClosestKBucket(ln.ID) + if err != nil { + *lns = []kademlia.ListedNode{} + return nil + } + log.Info("[FIND_NODE] k", k) + bucket := n.kademlia.KBuckets[k] + *lns = bucket + return nil +} +type FindValueResp struct { + Value []byte + Lns []kademlia.ListedNode } -func (n *Node) FindValue() { +func (n *Node) FindValue(id kademlia.ID, resp *FindValueResp) error { log.Info("[rpc] FIND_VALUE") + // first check if value is in this node storage + f, err := ioutil.ReadFile(config.C.Storage + "/" + id.String()) + if err == nil { + // value exists, return it + *resp = FindValueResp{ + Value: f, + } + return nil + } + + // k := n.kademlia.KBucket(id) + k, err := n.kademlia.FindClosestKBucket(id) + if err != nil { + *resp = FindValueResp{} + return nil + } + log.Info("[FIND_VALUE] k", k) + // bucket := n.kademlia.KBuckets[k] + *resp = FindValueResp{ + Lns: n.kademlia.KBuckets[k], + } + return nil } diff --git a/rpc-test/test.go b/rpc-test/test.go index f58d706..2d1211b 100644 --- a/rpc-test/test.go +++ b/rpc-test/test.go @@ -1,14 +1,146 @@ package main import ( + "errors" + "flag" "fmt" "go-dht/kademlia" + "go-dht/node" "log" "net/rpc" + "strconv" ) // Utility to test the Node RPC methods +func main() { + pingFlag := flag.Bool("ping", false, "test Ping") + findnodeFlag := flag.Bool("findnode", false, "test FindNode") + findvalueFlag := flag.Bool("findvalue", false, "test FindValue") + storeFlag := flag.Bool("store", false, "test Store") + flag.Parse() + + if *pingFlag { + testPing() + } + if *findnodeFlag { + testFindNode() + } + if *findvalueFlag { + testFindValue() + } + if *storeFlag { + testStore() + } +} + +func testPing() { + lns := prepareTestListedNodes() + client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000") + if err != nil { + log.Fatal("Connection error: ", err) + } + + var reply kademlia.ListedNode + for _, ln := range lns { + err = client.Call("Node.Ping", ln, &reply) + if err != nil { + panic(err) + } + fmt.Println(reply) + } +} + +func testFindNode() { + // existing node + id, err := kademlia.IDFromString("1ff734fb9897600ca54a9c55ace2d22a51afb610") + if err != nil { + panic(err) + } + ln := kademlia.ListedNode{ + ID: id, + Addr: "", + Port: "", + } + client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000") + if err != nil { + log.Fatal("Connection error: ", err) + } + + var reply []kademlia.ListedNode + err = client.Call("Node.FindNode", ln, &reply) + if err != nil { + panic(err) + } + fmt.Println(reply) + + // find non existing node, to get a closer one + id, err = kademlia.IDFromString("1ff734fb9897600ca54a9c55ace2d22a51aaaaaa") + if err != nil { + panic(err) + } + ln = kademlia.ListedNode{ + ID: id, + Addr: "", + Port: "", + } + + var reply2 []kademlia.ListedNode + err = client.Call("Node.FindNode", ln, &reply2) + if err != nil { + panic(err) + } + fmt.Println(reply2) +} + +func testFindValue() { + client, err := rpc.DialHTTP("tcp", "127.0.0.1:5002") + if err != nil { + log.Fatal("Connection error: ", err) + } + // first store the value + data := []byte("test data0") + h := kademlia.HashData(data) + fmt.Println(h) + var reply bool + err = client.Call("Node.Store", data, &reply) + if err != nil { + panic(err) + } + fmt.Println(reply) + + // now call FIND_VALUE + id, err := kademlia.IDFromString("1ff734fb9897600ca54a9c55ace2d22a51afb610") + if err != nil { + panic(err) + } + var reply2 node.FindValueResp + err = client.Call("Node.FindValue", id, &reply2) + if err != nil { + panic(err) + } + if len(reply2.Value) == 0 { + panic(errors.New("expected value response on FIND_VALUE")) + } + fmt.Println("FIND_VALUE response:", string(reply2.Value)) +} + +func testStore() { + client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000") + if err != nil { + log.Fatal("Connection error: ", err) + } + var reply bool + for i := 0; i < 10; i++ { + err = client.Call("Node.Store", []byte("test data"+strconv.Itoa(i)), &reply) + if err != nil { + panic(err) + } + fmt.Println(reply) + } + +} + func prepareTestListedNodes() []kademlia.ListedNode { lnIDs := []string{ "c48d8b53dbefb609ed4e94d386dd5b22efcb2c5b", @@ -45,21 +177,3 @@ func prepareTestListedNodes() []kademlia.ListedNode { } return lns } - -func main() { - lns := prepareTestListedNodes() - - client, err := rpc.DialHTTP("tcp", "127.0.0.1:5000") - if err != nil { - log.Fatal("Connection error: ", err) - } - - var reply kademlia.ListedNode - for _, ln := range lns { - err = client.Call("Node.Ping", ln, &reply) - if err != nil { - panic(err) - } - fmt.Println(reply) - } -}