From 5ce8bb33ee36b46b0306a14fda20f5f7cb2f9afa Mon Sep 17 00:00:00 2001 From: imw Date: Thu, 7 Feb 2019 12:06:10 +0100 Subject: [PATCH] initial attempt at modular transports, fails with segfault --- cmd/relay/relay.go | 17 ++++++++-- data/data.go | 24 ++------------ net/http.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++ net/net.go | 66 ++++++++++++++++++------------------- net/pubsub.go | 79 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 210 insertions(+), 57 deletions(-) create mode 100644 net/http.go create mode 100644 net/pubsub.go diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index 4adcdf9..0de424e 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -5,6 +5,8 @@ import ( "time" "encoding/gob" "bytes" + "os" + "flag" "github.com/vocdoni/dvote-relay/batch" "github.com/vocdoni/dvote-relay/net" "github.com/vocdoni/dvote-relay/db" @@ -19,6 +21,7 @@ var err error var batchTimer *time.Ticker var batchSignal chan bool var signal bool +var transportType net.TransportID func main() { @@ -31,16 +34,26 @@ func main() { batch.Setup(db) + //gather transport type flag + var transportIDString string + flag.StringVar(&transportIDString, "transport", "PubSub", "Transport must be one of: PubSub, HTTP") + flag.Parse() + transportType = net.TransportIDFromString(transportIDString) + + batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds)) batchSignal = make(chan bool) batch.BatchSignal = batchSignal batch.BatchSize = batchSize - topic := "vocdoni_pubsub_testing" fmt.Println("Entering main loop") - go net.Sub(topic) + transport, err := net.Init(transportType) + if err != nil { + os.Exit(1) + } + go transport.Listen() for { select { case <- batchTimer.C: diff --git a/data/data.go b/data/data.go index 036ee67..3071b3e 100644 --- a/data/data.go +++ b/data/data.go @@ -9,8 +9,9 @@ import ( shell "github.com/ipfs/go-ipfs-api" ) -type Record struct { - Shell *shell.Message +type Storage interface { + Publish(o []byte) string + Retrieve(id string) []byte } func Publish(object []byte) string { @@ -46,22 +47,3 @@ func Retrieve(hash string) []byte { } return content } - -func PsSubscribe(topic string) *shell.PubSubSubscription { - sh := shell.NewShell("localhost:5001") - sub, err := sh.PubSubSubscribe(topic) - if err != nil { - fmt.Fprintf(os.Stderr, "error: %s", err) - os.Exit(1) - } - return sub -} - -func PsPublish(topic, data string) error { - sh := shell.NewShell("localhost:5001") - err := sh.PubSubPublish(topic, data) - if err != nil { - return err - } - return nil -} diff --git a/net/http.go b/net/http.go new file mode 100644 index 0000000..7bcecac --- /dev/null +++ b/net/http.go @@ -0,0 +1,81 @@ +package net + +import ( + "io" + "fmt" + "net/http" + "encoding/json" + "strings" + + "github.com/vocdoni/dvote-relay/batch" + "github.com/vocdoni/dvote-relay/types" +) + +type HttpHandle struct { + port string + path string +} + +func (h HttpHandle) Init(c string) error { + //split c to port and path + cs := strings.Split(c, "/") + h.port = cs[0] + h.path = cs[1] + return nil + +} + +func parse(rw http.ResponseWriter, request *http.Request) { + decoder := json.NewDecoder(request.Body) + + var e types.Envelope + var b types.Ballot + + err := decoder.Decode(&e) + if err != nil { + panic(err) + } + + err = json.Unmarshal(e.Ballot, &b) + if err != nil { + panic(err) + } + + + //check PoW + //check key + //decrypt + //check franchise + //construct packet + + //this should should be randomized, or actually taken from input + //b.PID = "1" + //b.Nullifier = []byte{1,2,3} + //b.Vote = []byte{4,5,6} + //b.Franchise = []byte{7,8,9} + + err = batch.Add(b) + if err != nil { + panic(err) + } + + j, err := json.Marshal(e) + if err != nil { + panic(err) + } + io.WriteString(rw, string(j)) +} + +func (h HttpHandle) Listen() error { + http.HandleFunc(h.path, parse) + //add waitgroup + func() { + fmt.Println("serving on " + h.port) + err := http.ListenAndServe(":" + h.port, nil) + if err != nil { + return + } + }() + return nil +} + diff --git a/net/net.go b/net/net.go index 22f7e43..941f7c5 100644 --- a/net/net.go +++ b/net/net.go @@ -1,45 +1,43 @@ package net import ( - "encoding/json" - "fmt" - - "github.com/vocdoni/dvote-relay/batch" - "github.com/vocdoni/dvote-relay/data" - "github.com/vocdoni/dvote-relay/types" + "errors" ) -func Sub(topic string) error { - subscription := data.PsSubscribe(topic) - fmt.Println("Subscribed > " + topic) - var msg data.Record - var err error - for { - msg.Shell, err = subscription.Next() - if err != nil { - return err - } - - payload := msg.Shell.Data - - var e types.Envelope - var b types.Ballot +type Transport interface { + Init(c string) error + Listen() error +} - err = json.Unmarshal(payload, &e) - if err != nil { - return err - } +type TransportID int - err = json.Unmarshal(e.Ballot, &b) - if err != nil { - return err - } +const ( + HTTP TransportID = iota + 1 + PubSub +) - err = batch.Add(b) - if err != nil { - return err - } +func TransportIDFromString(i string) TransportID { + switch i { + case "PubSub" : + return PubSub + case "HTTP": + return HTTP + default: + return -1 + } +} - fmt.Println("Got > " + string(payload)) +func Init(t TransportID) (Transport, error) { + switch t { + case PubSub : + var p PubSubHandle + p.Init("vocdoni_pubsub_testing") + return p, nil + case HTTP : + var h HttpHandle + h.Init("8080/submit") + return h, nil + default: + return nil, errors.New("Bad transport type specification") } } diff --git a/net/pubsub.go b/net/pubsub.go new file mode 100644 index 0000000..bd87ce8 --- /dev/null +++ b/net/pubsub.go @@ -0,0 +1,79 @@ +package net + +import ( + "os" + "fmt" + "encoding/json" + + shell "github.com/ipfs/go-ipfs-api" + "github.com/vocdoni/dvote-relay/batch" + "github.com/vocdoni/dvote-relay/types" +) + +type PubSubHandle struct { + topic string + subscription *shell.PubSubSubscription +} + +func PsSubscribe(topic string) *shell.PubSubSubscription { + sh := shell.NewShell("localhost:5001") + sub, err := sh.PubSubSubscribe(topic) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %s", err) + os.Exit(1) + } + return sub +} + +func PsPublish(topic, data string) error { + sh := shell.NewShell("localhost:5001") + err := sh.PubSubPublish(topic, data) + if err != nil { + return err + } + return nil +} + +func (p PubSubHandle) Init(topic string) error { + p.topic = topic + p.subscription = PsSubscribe(p.topic) + fmt.Println("Subscribed > " + p.topic) + return nil +} + +func (p PubSubHandle) Listen() error { + var msg *shell.Message + var err error + for { + msg, err = p.subscription.Next() + if err != nil { + return err + } + + payload := msg.Data + + var e types.Envelope + var b types.Ballot + + err = json.Unmarshal(payload, &e) + if err != nil { + return err + } + + err = json.Unmarshal(e.Ballot, &b) + if err != nil { + return err + } + + err = batch.Add(b) + if err != nil { + return err + } + + fmt.Println("Got > " + string(payload)) + } +} + +func (p PubSubHandle) Send(data string) error { + return PsPublish(p.topic, data) +}