Browse Source

refactored net module for channel based communication to batch reciever

feature/pss_integration
imw 5 years ago
parent
commit
99bd03a93e
6 changed files with 131 additions and 89 deletions
  1. +39
    -11
      batch/batch.go
  2. +17
    -13
      cmd/relay/relay.go
  3. +24
    -14
      net/net.go
  4. +21
    -34
      net/pubsub.go
  5. +4
    -4
      net/swarm/swarm.go
  6. +26
    -13
      types/types.go

+ 39
- 11
batch/batch.go

@ -1,10 +1,11 @@
package batch package batch
import ( import (
"encoding/json"
"fmt" "fmt"
"github.com/vocdoni/go-dvote/types"
"github.com/vocdoni/go-dvote/db" "github.com/vocdoni/go-dvote/db"
"encoding/json"
"github.com/vocdoni/go-dvote/types"
) )
var rdb *db.LevelDbStorage var rdb *db.LevelDbStorage
@ -18,6 +19,33 @@ func Setup(l *db.LevelDbStorage) {
bdb = l.WithPrefix([]byte("batch_")) bdb = l.WithPrefix([]byte("batch_"))
} }
func Recieve(messages <-chan types.Message) {
var message types.Message
var payload []byte
var e types.Envelope
var b types.Ballot
for {
message = <-messages
payload = message.Data
err = json.Unmarshal(payload, &e)
if err != nil {
//log error
}
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
//log error
}
err = Add(b)
if err != nil {
//log error
}
fmt.Println("Got > " + string(payload))
}
}
//add (queue for counting) //add (queue for counting)
func Add(ballot types.Ballot) error { func Add(ballot types.Ballot) error {
@ -28,7 +56,7 @@ func Add(ballot types.Ballot) error {
var err error var err error
b, err = json.Marshal(ballot) b, err = json.Marshal(ballot)
n, err = json.Marshal(ballot.Nullifier) n, err = json.Marshal(ballot.Nullifier)
err = bdb.Put(n,b)
err = bdb.Put(n, b)
if err != nil { if err != nil {
return err return err
} }
@ -59,14 +87,14 @@ func Fetch() ([]string, []string) {
b = append(b, string(v)) b = append(b, string(v))
} }
iter.Release() iter.Release()
// jn, err := json.Marshal(n)
// if err != nil {
// panic(err)
// }
// jb, err := json.Marshal(b)
// if err != nil {
// panic(err)
// }
// jn, err := json.Marshal(n)
// if err != nil {
// panic(err)
// }
// jb, err := json.Marshal(b)
// if err != nil {
// panic(err)
// }
return n, b return n, b
} }

+ 17
- 13
cmd/relay/relay.go

@ -1,21 +1,23 @@
package main package main
import ( import (
"fmt"
"time"
"encoding/gob"
"bytes" "bytes"
"os"
"encoding/gob"
"flag" "flag"
"fmt"
"os"
"time"
"github.com/vocdoni/go-dvote/batch" "github.com/vocdoni/go-dvote/batch"
"github.com/vocdoni/go-dvote/net"
"github.com/vocdoni/go-dvote/db"
"github.com/vocdoni/go-dvote/data" "github.com/vocdoni/go-dvote/data"
"github.com/vocdoni/go-dvote/db"
"github.com/vocdoni/go-dvote/net"
"github.com/vocdoni/go-dvote/types"
) )
var dbPath = "~/.dvote/relay.db" var dbPath = "~/.dvote/relay.db"
var batchSeconds = 10 //seconds var batchSeconds = 10 //seconds
var batchSize = 3 //packets
var batchSize = 3 //packets
var err error var err error
var batchTimer *time.Ticker var batchTimer *time.Ticker
@ -23,7 +25,6 @@ var batchSignal chan bool
var signal bool var signal bool
var transportType net.TransportID var transportType net.TransportID
func main() { func main() {
db, err := db.NewLevelDbStorage(dbPath, false) db, err := db.NewLevelDbStorage(dbPath, false)
@ -40,25 +41,26 @@ func main() {
flag.Parse() flag.Parse()
transportType = net.TransportIDFromString(transportIDString) transportType = net.TransportIDFromString(transportIDString)
batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds)) batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds))
batchSignal = make(chan bool) batchSignal = make(chan bool)
batch.BatchSignal = batchSignal batch.BatchSignal = batchSignal
batch.BatchSize = batchSize batch.BatchSize = batchSize
fmt.Println("Entering main loop") fmt.Println("Entering main loop")
transport, err := net.Init(transportType) transport, err := net.Init(transportType)
listenerOutput := make(chan types.Message, 10)
listenerErrors := make(chan error)
if err != nil { if err != nil {
os.Exit(1) os.Exit(1)
} }
go transport.Listen()
go transport.Listen(listenerOutput, listenerErrors)
go batch.Recieve(listenerOutput)
for { for {
select { select {
case <- batchTimer.C:
case <-batchTimer.C:
fmt.Println("Timer triggered") fmt.Println("Timer triggered")
// fmt.Println(batch.Create())
// fmt.Println(batch.Create())
//replace with chain link //replace with chain link
case signal := <-batchSignal: case signal := <-batchSignal:
if signal == true { if signal == true {
@ -78,6 +80,8 @@ func main() {
//fmt.Println(b) //fmt.Println(b)
batch.Compact(ns) batch.Compact(ns)
} }
case listenError := <-listenerErrors:
fmt.Println(listenError)
default: default:
continue continue
} }

+ 24
- 14
net/net.go

@ -2,26 +2,28 @@ package net
import ( import (
"errors" "errors"
"github.com/vocdoni/go-dvote/types"
) )
type Transport interface { type Transport interface {
Listen() error
Init(c string) error
Listen(reciever chan<- types.Message, errors chan<- error)
Send(msg []byte, errors chan<- error)
Init() error
} }
type TransportID int type TransportID int
const ( const (
HTTP TransportID = iota + 1
PubSub
PubSub TransportID = iota + 1
PSS
) )
func TransportIDFromString(i string) TransportID { func TransportIDFromString(i string) TransportID {
switch i { switch i {
case "PubSub" :
case "PubSub":
return PubSub return PubSub
case "HTTP":
return HTTP
case "PSS":
return PSS
default: default:
return -1 return -1
} }
@ -29,14 +31,22 @@ func TransportIDFromString(i string) TransportID {
func Init(t TransportID) (Transport, error) { func Init(t TransportID) (Transport, error) {
switch t { switch t {
case PubSub :
case PubSub:
p := new(PubSubHandle) p := new(PubSubHandle)
p.Init("vocdoni_pubsub_testing")
defaultConnection := new(types.Connection)
defaultConnection.Topic = "vocdoni_testing"
p.c = defaultConnection
p.Init()
return p, nil
case PSS:
p := new(PSSHandle)
defaultConnection := new(types.Connection)
defaultConnection.Topic = "vocdoni_testing"
defaultConnection.Key = ""
defaultConnection.Kind = "sym"
p.c = defaultConnection
p.Init()
return p, nil return p, nil
case HTTP :
h := new(HttpHandle)
h.Init("8080/submit")
return h, nil
default: default:
return nil, errors.New("Bad transport type specification") return nil, errors.New("Bad transport type specification")
} }

+ 21
- 34
net/pubsub.go

@ -1,18 +1,17 @@
package net package net
import ( import (
"os"
"fmt" "fmt"
"encoding/json"
"os"
"time"
shell "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api"
"github.com/vocdoni/go-dvote/batch"
"github.com/vocdoni/go-dvote/types" "github.com/vocdoni/go-dvote/types"
) )
type PubSubHandle struct { type PubSubHandle struct {
topic string
subscription *shell.PubSubSubscription
c *types.Connection
s *shell.PubSubSubscription
} }
func PsSubscribe(topic string) *shell.PubSubSubscription { func PsSubscribe(topic string) *shell.PubSubSubscription {
@ -34,46 +33,34 @@ func PsPublish(topic, data string) error {
return nil return nil
} }
func (p *PubSubHandle) Init(topic string) error {
p.topic = topic
p.subscription = PsSubscribe(p.topic)
func (p *PubSubHandle) Init() error {
p.s = PsSubscribe(p.c.Topic)
return nil return nil
} }
func (p *PubSubHandle) Listen() error {
var msg *shell.Message
func (p *PubSubHandle) Listen(reciever chan<- types.Message, errors chan<- error) {
var psMessage *shell.Message
var msg types.Message
var err error var err error
for { for {
msg, err = p.subscription.Next()
psMessage, err = p.s.Next()
if err != nil { if err != nil {
errors <- err
fmt.Fprintf(os.Stderr, "recieve error: %s", err) fmt.Fprintf(os.Stderr, "recieve error: %s", err)
return err
}
payload := msg.Data
var e types.Envelope
var b types.Ballot
err = json.Unmarshal(payload, &e)
if err != nil {
return err
} }
msg.Topic = p.c.Topic
msg.Data = psMessage.Data
msg.Address = psMessage.From.String()
msg.TimeStamp = time.Now()
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
return err
}
reciever <- msg
err = batch.Add(b)
if err != nil {
return err
}
fmt.Println("Got > " + string(payload))
} }
} }
func (p *PubSubHandle) Send(data string) error {
return PsPublish(p.topic, data)
func (p *PubSubHandle) Send(data []byte, errors chan<- error) {
err := PsPublish(p.c.Topic, string(data))
if err != nil {
errors <- err
}
} }

+ 4
- 4
net/swarm/swarm.go

@ -112,7 +112,7 @@ func NewSwarmPorts() *swarmPorts {
return sp return sp
} }
type pssMsg struct {
type PssMsg struct {
Msg []byte Msg []byte
Peer *p2p.Peer Peer *p2p.Peer
Asym bool Asym bool
@ -121,7 +121,7 @@ type pssMsg struct {
type pssSub struct { type pssSub struct {
Unregister func() Unregister func()
Delivery (chan pssMsg)
Delivery (chan PssMsg)
Address string Address string
} }
@ -281,12 +281,12 @@ func (sn *SimplePss) PssSub(subType, key, topic, address string) error {
sn.PssTopics[topic] = new(pssSub) sn.PssTopics[topic] = new(pssSub)
sn.PssTopics[topic].Address = address sn.PssTopics[topic].Address = address
sn.PssTopics[topic].Delivery = make(chan pssMsg)
sn.PssTopics[topic].Delivery = make(chan PssMsg)
var pssHandler pss.HandlerFunc = func(msg []byte, peer *p2p.Peer, asym bool, keyid string) error { var pssHandler pss.HandlerFunc = func(msg []byte, peer *p2p.Peer, asym bool, keyid string) error {
log.Debug("pss received", "msg", fmt.Sprintf("%s", msg), "keyid", fmt.Sprintf("%s", keyid)) log.Debug("pss received", "msg", fmt.Sprintf("%s", msg), "keyid", fmt.Sprintf("%s", keyid))
sn.PssTopics[topic].Delivery <- pssMsg{Msg: msg, Peer: peer, Asym: asym, Keyid: keyid}
sn.PssTopics[topic].Delivery <- PssMsg{Msg: msg, Peer: peer, Asym: asym, Keyid: keyid}
return nil return nil
} }
topicHandler := pss.NewHandler(pssHandler) topicHandler := pss.NewHandler(pssHandler)

+ 26
- 13
types/types.go

@ -4,28 +4,41 @@ import (
"time" "time"
) )
type Message struct {
Topic string
Data []byte
Address string
TimeStamp time.Time
}
type Connection struct {
Topic string
Key string
Kind string
Address string
}
type Ballot struct { type Ballot struct {
Type string
PID string
Type string
PID string
Nullifier []byte Nullifier []byte
Vote []byte
Vote []byte
Franchise []byte Franchise []byte
} }
type Envelope struct { type Envelope struct {
Type string
Nonce uint64
KeyProof []byte
Ballot []byte
Type string
Nonce uint64
KeyProof []byte
Ballot []byte
Timestamp time.Time Timestamp time.Time
} }
type Batch struct { type Batch struct {
Type string
Type string
Nullifiers []string Nullifiers []string
URL string
TXID string
Nonce []byte
Signature string
URL string
TXID string
Nonce []byte
Signature string
} }

Loading…
Cancel
Save