Browse Source

pubsub messagaing working w/ generator

pubsub_testing
imw 5 years ago
parent
commit
90de87f243
6 changed files with 54 additions and 26 deletions
  1. +10
    -21
      cmd/generator/generator.go
  2. +3
    -2
      cmd/relay/relay.go
  3. +3
    -3
      data/data.go
  4. +1
    -0
      go.mod
  5. +2
    -0
      go.sum
  6. +35
    -0
      net/net.go

+ 10
- 21
cmd/generator/generator.go

@ -8,10 +8,11 @@ import (
"math/rand" "math/rand"
"encoding/json" "encoding/json"
"encoding/base64" "encoding/base64"
"net/http"
"bytes"
"io/ioutil"
// "net/http"
// "bytes"
// "io/ioutil"
"github.com/vocdoni/dvote-relay/types" "github.com/vocdoni/dvote-relay/types"
"github.com/vocdoni/dvote-relay/data"
) )
func makeBallot() string { func makeBallot() string {
@ -75,28 +76,16 @@ func main() {
i, _ := strconv.Atoi(interval) i, _ := strconv.Atoi(interval)
timer := time.NewTicker(time.Millisecond * time.Duration(i)) timer := time.NewTicker(time.Millisecond * time.Duration(i))
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
url := "http://localhost:8090/submit"
fmt.Println("URL:>", url)
topic := "vocdoni_pubsub_testing"
fmt.Println("PubSub Topic:>", topic)
for { for {
select { select {
case <- timer.C: case <- timer.C:
fmt.Println(makeEnvelope(makeBallot()))
var jsonStr = []byte(makeEnvelope(makeBallot()))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
req.Header.Set("X-Custom-Header", "myvalue")
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
fmt.Println("response Status:", resp.Status)
fmt.Println("response Headers:", resp.Header)
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println("response Body:", string(body))
var jsonStr = makeEnvelope(makeBallot())
fmt.Println(jsonStr)
data.PsPublish(topic, jsonStr)
default: default:
continue continue
} }

+ 3
- 2
cmd/relay/relay.go

@ -37,8 +37,10 @@ func main() {
batch.BatchSignal = batchSignal batch.BatchSignal = batchSignal
batch.BatchSize = batchSize batch.BatchSize = batchSize
topic := "vocdoni_pubsub_testing"
fmt.Println("Entering main loop") fmt.Println("Entering main loop")
go net.Listen("8090")
go net.Sub(topic)
for { for {
select { select {
case <- batchTimer.C: case <- batchTimer.C:
@ -55,7 +57,6 @@ func main() {
cid := data.Publish(bb) cid := data.Publish(bb)
data.Pin(cid) data.Pin(cid)
fmt.Printf("Batch published at: %s \n", cid) fmt.Printf("Batch published at: %s \n", cid)
// add to ipfs
// add to chain // add to chain
// announce to pubsub // announce to pubsub
//fmt.Println("Nullifiers:") //fmt.Println("Nullifiers:")

+ 3
- 3
data/data.go

@ -53,11 +53,11 @@ func PsSubscribe(topic string) *shell.PubSubSubscription {
return sub return sub
} }
func PsPublish(topic, data string) {
func PsPublish(topic, data string) error {
sh := shell.NewShell("localhost:5001") sh := shell.NewShell("localhost:5001")
err := sh.PubSubPublish(topic, data) err := sh.PubSubPublish(topic, data)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
return err
} }
return nil
} }

+ 1
- 0
go.mod

@ -6,6 +6,7 @@ require (
github.com/gogo/protobuf v1.2.0 // indirect github.com/gogo/protobuf v1.2.0 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 // indirect github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 // indirect
github.com/ipfs/go-ipfs v0.4.18 // indirect
github.com/ipfs/go-ipfs-api v1.3.5 github.com/ipfs/go-ipfs-api v1.3.5
github.com/ipfs/go-ipfs-cmdkit v1.1.3 // indirect github.com/ipfs/go-ipfs-cmdkit v1.1.3 // indirect
github.com/libp2p/go-flow-metrics v0.2.0 // indirect github.com/libp2p/go-flow-metrics v0.2.0 // indirect

+ 2
- 0
go.sum

@ -20,6 +20,8 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 h1:neM/RzmgBKxsJ3ioEZnIQmgQQq/sn6xDqYOEYnH3RYM= github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2 h1:neM/RzmgBKxsJ3ioEZnIQmgQQq/sn6xDqYOEYnH3RYM=
github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2/go.mod h1:YUhWml1NaWLTNBl4NPptkB8MadfaIhgq+a2TRc+Mw4Q= github.com/gxed/hashland v0.0.0-20180221191214-d9f6b97f8db2/go.mod h1:YUhWml1NaWLTNBl4NPptkB8MadfaIhgq+a2TRc+Mw4Q=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-ipfs v0.4.18 h1:QBpaJj4emf63RWnqi/PFRQrwGvanlbNqPHHHfVz8IiU=
github.com/ipfs/go-ipfs v0.4.18/go.mod h1:iXzbK+Wa6eePj3jQg/uY6Uoq5iOwY+GToD/bgaRadto=
github.com/ipfs/go-ipfs-api v1.3.5 h1:NvOdNXkN1S9XiaRlpPV8SUOBKcRVlpUhoHgbDteMs60= github.com/ipfs/go-ipfs-api v1.3.5 h1:NvOdNXkN1S9XiaRlpPV8SUOBKcRVlpUhoHgbDteMs60=
github.com/ipfs/go-ipfs-api v1.3.5/go.mod h1:YWGjU+7Bdls1CpvsKsV6EsQ/KMyQqSpBru2hme/5WQg= github.com/ipfs/go-ipfs-api v1.3.5/go.mod h1:YWGjU+7Bdls1CpvsKsV6EsQ/KMyQqSpBru2hme/5WQg=
github.com/ipfs/go-ipfs-cmdkit v1.1.3 h1:4sbEnLjYmh8gJOCUiqtKp0EIboSYTFrxftc7S+U3gK4= github.com/ipfs/go-ipfs-cmdkit v1.1.3 h1:4sbEnLjYmh8gJOCUiqtKp0EIboSYTFrxftc7S+U3gK4=

+ 35
- 0
net/net.go

@ -7,9 +7,11 @@ import (
"io" "io"
"github.com/vocdoni/dvote-relay/batch" "github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/types" "github.com/vocdoni/dvote-relay/types"
"github.com/vocdoni/dvote-relay/data"
) )
func parse(rw http.ResponseWriter, request *http.Request) { func parse(rw http.ResponseWriter, request *http.Request) {
decoder := json.NewDecoder(request.Body) decoder := json.NewDecoder(request.Body)
@ -51,6 +53,39 @@ func parse(rw http.ResponseWriter, request *http.Request) {
io.WriteString(rw, string(j)) io.WriteString(rw, string(j))
} }
func Sub(topic string) error {
subscription := data.PsSubscribe(topic)
fmt.Println("Subscribed > " + topic)
for {
msg, err := subscription.Next()
if err != nil {
return err
}
payload := msg.Data()
var e types.Envelope
var b types.Ballot
err = json.Unmarshal(payload, &e)
if err != nil {
return err
}
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
return err
}
err = batch.Add(b)
if err != nil {
return err
}
fmt.Println("Got > " + string(payload))
}
}
func Listen(port string) { func Listen(port string) {
http.HandleFunc("/submit", parse) http.HandleFunc("/submit", parse)
//add waitgroup //add waitgroup

Loading…
Cancel
Save