From 90de87f2436493ab8462d74a960b3022fcab3bc2 Mon Sep 17 00:00:00 2001 From: imw Date: Tue, 22 Jan 2019 16:44:45 +0100 Subject: [PATCH 1/2] pubsub messagaing working w/ generator --- cmd/generator/generator.go | 31 ++++++++++--------------------- cmd/relay/relay.go | 5 +++-- data/data.go | 6 +++--- go.mod | 1 + go.sum | 2 ++ net/net.go | 35 +++++++++++++++++++++++++++++++++++ 6 files changed, 54 insertions(+), 26 deletions(-) diff --git a/cmd/generator/generator.go b/cmd/generator/generator.go index e51f1eb..6990d32 100644 --- a/cmd/generator/generator.go +++ b/cmd/generator/generator.go @@ -8,10 +8,11 @@ import ( "math/rand" "encoding/json" "encoding/base64" - "net/http" - "bytes" - "io/ioutil" +// "net/http" +// "bytes" +// "io/ioutil" "github.com/vocdoni/dvote-relay/types" + "github.com/vocdoni/dvote-relay/data" ) func makeBallot() string { @@ -75,28 +76,16 @@ func main() { i, _ := strconv.Atoi(interval) timer := time.NewTicker(time.Millisecond * time.Duration(i)) rand.Seed(time.Now().UnixNano()) - url := "http://localhost:8090/submit" - fmt.Println("URL:>", url) + topic := "vocdoni_pubsub_testing" + fmt.Println("PubSub Topic:>", topic) + for { select { case <- timer.C: - fmt.Println(makeEnvelope(makeBallot())) - var jsonStr = []byte(makeEnvelope(makeBallot())) - req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr)) - req.Header.Set("X-Custom-Header", "myvalue") - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - panic(err) - } - defer resp.Body.Close() - fmt.Println("response Status:", resp.Status) - fmt.Println("response Headers:", resp.Header) - body, _ := ioutil.ReadAll(resp.Body) - fmt.Println("response Body:", string(body)) + var jsonStr = makeEnvelope(makeBallot()) + fmt.Println(jsonStr) + data.PsPublish(topic, jsonStr) default: continue } diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index fed06bc..4adcdf9 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -37,8 +37,10 @@ func main() { batch.BatchSignal = batchSignal batch.BatchSize = batchSize + topic := "vocdoni_pubsub_testing" + fmt.Println("Entering main loop") - go net.Listen("8090") + go net.Sub(topic) for { select { case <- batchTimer.C: @@ -55,7 +57,6 @@ func main() { cid := data.Publish(bb) data.Pin(cid) fmt.Printf("Batch published at: %s \n", cid) - // add to ipfs // add to chain // announce to pubsub //fmt.Println("Nullifiers:") diff --git a/data/data.go b/data/data.go index c48997e..5f56f31 100644 --- a/data/data.go +++ b/data/data.go @@ -53,11 +53,11 @@ func PsSubscribe(topic string) *shell.PubSubSubscription { return sub } -func PsPublish(topic, data string) { +func PsPublish(topic, data string) error { sh := shell.NewShell("localhost:5001") err := sh.PubSubPublish(topic, data) if err != nil { - fmt.Fprintf(os.Stderr, "error: %s", err) - os.Exit(1) + return err } + return nil } diff --git a/go.mod b/go.mod index 92b4ed3..442ed0c 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gogo/protobuf v1.2.0 // indirect github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 // indirect + github.com/ipfs/go-ipfs v0.4.18 // indirect github.com/ipfs/go-ipfs-api v1.3.5 github.com/ipfs/go-ipfs-cmdkit v1.1.3 // indirect github.com/libp2p/go-flow-metrics v0.2.0 // indirect diff --git a/go.sum b/go.sum index 44cd846..540d492 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 h1:neM/RzmgBKxsJ3ioEZnIQmgQQq/sn6xDqYOEYnH3RYM= github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2/go.mod h1:YUhWml1NaWLTNBl4NPptkB8MadfaIhgq+a2TRc+Mw4Q= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/ipfs/go-ipfs v0.4.18 h1:QBpaJj4emf63RWnqi/PFRQrwGvanlbNqPHHHfVz8IiU= +github.com/ipfs/go-ipfs v0.4.18/go.mod h1:iXzbK+Wa6eePj3jQg/uY6Uoq5iOwY+GToD/bgaRadto= github.com/ipfs/go-ipfs-api v1.3.5 h1:NvOdNXkN1S9XiaRlpPV8SUOBKcRVlpUhoHgbDteMs60= github.com/ipfs/go-ipfs-api v1.3.5/go.mod h1:YWGjU+7Bdls1CpvsKsV6EsQ/KMyQqSpBru2hme/5WQg= github.com/ipfs/go-ipfs-cmdkit v1.1.3 h1:4sbEnLjYmh8gJOCUiqtKp0EIboSYTFrxftc7S+U3gK4= diff --git a/net/net.go b/net/net.go index 1a88021..8091f6e 100644 --- a/net/net.go +++ b/net/net.go @@ -7,9 +7,11 @@ import ( "io" "github.com/vocdoni/dvote-relay/batch" "github.com/vocdoni/dvote-relay/types" + "github.com/vocdoni/dvote-relay/data" ) + func parse(rw http.ResponseWriter, request *http.Request) { decoder := json.NewDecoder(request.Body) @@ -51,6 +53,39 @@ func parse(rw http.ResponseWriter, request *http.Request) { io.WriteString(rw, string(j)) } +func Sub(topic string) error { + subscription := data.PsSubscribe(topic) + fmt.Println("Subscribed > " + topic) + for { + msg, err := subscription.Next() + if err != nil { + return err + } + + payload := msg.Data() + + var e types.Envelope + var b types.Ballot + + err = json.Unmarshal(payload, &e) + if err != nil { + return err + } + + err = json.Unmarshal(e.Ballot, &b) + if err != nil { + return err + } + + err = batch.Add(b) + if err != nil { + return err + } + + fmt.Println("Got > " + string(payload)) + } +} + func Listen(port string) { http.HandleFunc("/submit", parse) //add waitgroup From b88d88e931edffec5c4d90a7a06a1658ad1ed6b9 Mon Sep 17 00:00:00 2001 From: imw Date: Wed, 23 Jan 2019 13:45:04 +0100 Subject: [PATCH 2/2] removed remnant of http listener --- net/net.go | 56 ------------------------------------------------------ 1 file changed, 56 deletions(-) diff --git a/net/net.go b/net/net.go index 8091f6e..2c66ad1 100644 --- a/net/net.go +++ b/net/net.go @@ -3,56 +3,12 @@ package net import ( "encoding/json" "fmt" - "net/http" - "io" "github.com/vocdoni/dvote-relay/batch" "github.com/vocdoni/dvote-relay/types" "github.com/vocdoni/dvote-relay/data" ) - -func parse(rw http.ResponseWriter, request *http.Request) { - decoder := json.NewDecoder(request.Body) - - var e types.Envelope - var b types.Ballot - - err := decoder.Decode(&e) - if err != nil { - panic(err) - } - - err = json.Unmarshal(e.Ballot, &b) - if err != nil { - panic(err) - } - - - //check PoW - //check key - //decrypt - //check franchise - //construct packet - - //this should should be randomized, or actually taken from input - //b.PID = "1" - //b.Nullifier = []byte{1,2,3} - //b.Vote = []byte{4,5,6} - //b.Franchise = []byte{7,8,9} - - err = batch.Add(b) - if err != nil { - panic(err) - } - - j, err := json.Marshal(e) - if err != nil { - panic(err) - } - io.WriteString(rw, string(j)) -} - func Sub(topic string) error { subscription := data.PsSubscribe(topic) fmt.Println("Subscribed > " + topic) @@ -85,15 +41,3 @@ func Sub(topic string) error { fmt.Println("Got > " + string(payload)) } } - -func Listen(port string) { - http.HandleFunc("/submit", parse) - //add waitgroup - func() { - fmt.Println("serving on " + port) - err := http.ListenAndServe(":" + port, nil) - if err != nil { - panic("ListenAndServe: " + err.Error()) - } - }() -}