diff --git a/batch/batch.go b/batch/batch.go index 35465ea..9ee886b 100644 --- a/batch/batch.go +++ b/batch/batch.go @@ -1,10 +1,11 @@ package batch import ( + "encoding/json" "fmt" - "github.com/vocdoni/go-dvote/types" + "github.com/vocdoni/go-dvote/db" - "encoding/json" + "github.com/vocdoni/go-dvote/types" ) var rdb *db.LevelDbStorage @@ -18,6 +19,33 @@ func Setup(l *db.LevelDbStorage) { bdb = l.WithPrefix([]byte("batch_")) } +func Recieve(messages <-chan types.Message) { + var message types.Message + var payload []byte + var e types.Envelope + var b types.Ballot + for { + message = <-messages + payload = message.Data + + err = json.Unmarshal(payload, &e) + if err != nil { + //log error + } + + err = json.Unmarshal(e.Ballot, &b) + if err != nil { + //log error + } + + err = Add(b) + if err != nil { + //log error + } + + fmt.Println("Got > " + string(payload)) + } +} //add (queue for counting) func Add(ballot types.Ballot) error { @@ -28,7 +56,7 @@ func Add(ballot types.Ballot) error { var err error b, err = json.Marshal(ballot) n, err = json.Marshal(ballot.Nullifier) - err = bdb.Put(n,b) + err = bdb.Put(n, b) if err != nil { return err } @@ -59,14 +87,14 @@ func Fetch() ([]string, []string) { b = append(b, string(v)) } iter.Release() -// jn, err := json.Marshal(n) -// if err != nil { -// panic(err) -// } -// jb, err := json.Marshal(b) -// if err != nil { -// panic(err) -// } + // jn, err := json.Marshal(n) + // if err != nil { + // panic(err) + // } + // jb, err := json.Marshal(b) + // if err != nil { + // panic(err) + // } return n, b } diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index 438190c..059ae19 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -1,21 +1,23 @@ package main import ( - "fmt" - "time" - "encoding/gob" "bytes" - "os" + "encoding/gob" "flag" + "fmt" + "os" + "time" + "github.com/vocdoni/go-dvote/batch" - "github.com/vocdoni/go-dvote/net" - "github.com/vocdoni/go-dvote/db" "github.com/vocdoni/go-dvote/data" + "github.com/vocdoni/go-dvote/db" + "github.com/vocdoni/go-dvote/net" + "github.com/vocdoni/go-dvote/types" ) var dbPath = "~/.dvote/relay.db" var batchSeconds = 10 //seconds -var batchSize = 3 //packets +var batchSize = 3 //packets var err error var batchTimer *time.Ticker @@ -23,7 +25,6 @@ var batchSignal chan bool var signal bool var transportType net.TransportID - func main() { db, err := db.NewLevelDbStorage(dbPath, false) @@ -40,25 +41,26 @@ func main() { flag.Parse() transportType = net.TransportIDFromString(transportIDString) - batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds)) batchSignal = make(chan bool) batch.BatchSignal = batchSignal batch.BatchSize = batchSize - fmt.Println("Entering main loop") transport, err := net.Init(transportType) + listenerOutput := make(chan types.Message, 10) + listenerErrors := make(chan error) if err != nil { os.Exit(1) } - go transport.Listen() + go transport.Listen(listenerOutput, listenerErrors) + go batch.Recieve(listenerOutput) for { select { - case <- batchTimer.C: + case <-batchTimer.C: fmt.Println("Timer triggered") -// fmt.Println(batch.Create()) + // fmt.Println(batch.Create()) //replace with chain link case signal := <-batchSignal: if signal == true { @@ -78,6 +80,8 @@ func main() { //fmt.Println(b) batch.Compact(ns) } + case listenError := <-listenerErrors: + fmt.Println(listenError) default: continue } diff --git a/net/net.go b/net/net.go index aea17d4..b24615b 100644 --- a/net/net.go +++ b/net/net.go @@ -2,26 +2,28 @@ package net import ( "errors" + + "github.com/vocdoni/go-dvote/types" ) type Transport interface { - Listen() error - Init(c string) error + Listen(reciever chan<- types.Message, errors chan<- error) + Send(msg []byte, errors chan<- error) + Init() error } - type TransportID int const ( - HTTP TransportID = iota + 1 - PubSub + PubSub TransportID = iota + 1 + PSS ) func TransportIDFromString(i string) TransportID { switch i { - case "PubSub" : + case "PubSub": return PubSub - case "HTTP": - return HTTP + case "PSS": + return PSS default: return -1 } @@ -29,14 +31,22 @@ func TransportIDFromString(i string) TransportID { func Init(t TransportID) (Transport, error) { switch t { - case PubSub : + case PubSub: p := new(PubSubHandle) - p.Init("vocdoni_pubsub_testing") + defaultConnection := new(types.Connection) + defaultConnection.Topic = "vocdoni_testing" + p.c = defaultConnection + p.Init() + return p, nil + case PSS: + p := new(PSSHandle) + defaultConnection := new(types.Connection) + defaultConnection.Topic = "vocdoni_testing" + defaultConnection.Key = "" + defaultConnection.Kind = "sym" + p.c = defaultConnection + p.Init() return p, nil - case HTTP : - h := new(HttpHandle) - h.Init("8080/submit") - return h, nil default: return nil, errors.New("Bad transport type specification") } diff --git a/net/pubsub.go b/net/pubsub.go index 3434743..cf05128 100644 --- a/net/pubsub.go +++ b/net/pubsub.go @@ -1,18 +1,17 @@ package net import ( - "os" "fmt" - "encoding/json" + "os" + "time" shell "github.com/ipfs/go-ipfs-api" - "github.com/vocdoni/go-dvote/batch" "github.com/vocdoni/go-dvote/types" ) type PubSubHandle struct { - topic string - subscription *shell.PubSubSubscription + c *types.Connection + s *shell.PubSubSubscription } func PsSubscribe(topic string) *shell.PubSubSubscription { @@ -34,46 +33,34 @@ func PsPublish(topic, data string) error { return nil } -func (p *PubSubHandle) Init(topic string) error { - p.topic = topic - p.subscription = PsSubscribe(p.topic) +func (p *PubSubHandle) Init() error { + p.s = PsSubscribe(p.c.Topic) return nil } -func (p *PubSubHandle) Listen() error { - var msg *shell.Message +func (p *PubSubHandle) Listen(reciever chan<- types.Message, errors chan<- error) { + var psMessage *shell.Message + var msg types.Message var err error for { - msg, err = p.subscription.Next() + psMessage, err = p.s.Next() if err != nil { + errors <- err fmt.Fprintf(os.Stderr, "recieve error: %s", err) - return err - } - - payload := msg.Data - - var e types.Envelope - var b types.Ballot - - err = json.Unmarshal(payload, &e) - if err != nil { - return err } + msg.Topic = p.c.Topic + msg.Data = psMessage.Data + msg.Address = psMessage.From.String() + msg.TimeStamp = time.Now() - err = json.Unmarshal(e.Ballot, &b) - if err != nil { - return err - } + reciever <- msg - err = batch.Add(b) - if err != nil { - return err - } - - fmt.Println("Got > " + string(payload)) } } -func (p *PubSubHandle) Send(data string) error { - return PsPublish(p.topic, data) +func (p *PubSubHandle) Send(data []byte, errors chan<- error) { + err := PsPublish(p.c.Topic, string(data)) + if err != nil { + errors <- err + } } diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 2ce116f..e392df5 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -112,7 +112,7 @@ func NewSwarmPorts() *swarmPorts { return sp } -type pssMsg struct { +type PssMsg struct { Msg []byte Peer *p2p.Peer Asym bool @@ -121,7 +121,7 @@ type pssMsg struct { type pssSub struct { Unregister func() - Delivery (chan pssMsg) + Delivery (chan PssMsg) Address string } @@ -281,12 +281,12 @@ func (sn *SimplePss) PssSub(subType, key, topic, address string) error { sn.PssTopics[topic] = new(pssSub) sn.PssTopics[topic].Address = address - sn.PssTopics[topic].Delivery = make(chan pssMsg) + sn.PssTopics[topic].Delivery = make(chan PssMsg) var pssHandler pss.HandlerFunc = func(msg []byte, peer *p2p.Peer, asym bool, keyid string) error { log.Debug("pss received", "msg", fmt.Sprintf("%s", msg), "keyid", fmt.Sprintf("%s", keyid)) - sn.PssTopics[topic].Delivery <- pssMsg{Msg: msg, Peer: peer, Asym: asym, Keyid: keyid} + sn.PssTopics[topic].Delivery <- PssMsg{Msg: msg, Peer: peer, Asym: asym, Keyid: keyid} return nil } topicHandler := pss.NewHandler(pssHandler) diff --git a/types/types.go b/types/types.go index 03eee3a..a4b626d 100644 --- a/types/types.go +++ b/types/types.go @@ -4,28 +4,41 @@ import ( "time" ) +type Message struct { + Topic string + Data []byte + Address string + TimeStamp time.Time +} + +type Connection struct { + Topic string + Key string + Kind string + Address string +} + type Ballot struct { - Type string - PID string + Type string + PID string Nullifier []byte - Vote []byte + Vote []byte Franchise []byte } type Envelope struct { - Type string - Nonce uint64 - KeyProof []byte - Ballot []byte + Type string + Nonce uint64 + KeyProof []byte + Ballot []byte Timestamp time.Time } type Batch struct { - Type string + Type string Nullifiers []string - URL string - TXID string - Nonce []byte - Signature string + URL string + TXID string + Nonce []byte + Signature string } -