Browse Source

Merge pull request #19 from vocdoni/feature_modular_transports

Modular transports
feature_chain_module
imw 5 years ago
committed by GitHub
parent
commit
b0333c86aa
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 212 additions and 59 deletions
  1. +2
    -2
      cmd/generator/generator.go
  2. +15
    -2
      cmd/relay/relay.go
  3. +3
    -21
      data/data.go
  4. +81
    -0
      net/http.go
  5. +32
    -34
      net/net.go
  6. +79
    -0
      net/pubsub.go

+ 2
- 2
cmd/generator/generator.go

@ -12,7 +12,7 @@ import (
// "bytes" // "bytes"
// "io/ioutil" // "io/ioutil"
"github.com/vocdoni/dvote-relay/types" "github.com/vocdoni/dvote-relay/types"
"github.com/vocdoni/dvote-relay/data"
"github.com/vocdoni/dvote-relay/net"
) )
func makeBallot() string { func makeBallot() string {
@ -85,7 +85,7 @@ func main() {
case <- timer.C: case <- timer.C:
var jsonStr = makeEnvelope(makeBallot()) var jsonStr = makeEnvelope(makeBallot())
fmt.Println(jsonStr) fmt.Println(jsonStr)
data.PsPublish(topic, jsonStr)
net.PsPublish(topic, jsonStr)
default: default:
continue continue
} }

+ 15
- 2
cmd/relay/relay.go

@ -5,6 +5,8 @@ import (
"time" "time"
"encoding/gob" "encoding/gob"
"bytes" "bytes"
"os"
"flag"
"github.com/vocdoni/dvote-relay/batch" "github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/net" "github.com/vocdoni/dvote-relay/net"
"github.com/vocdoni/dvote-relay/db" "github.com/vocdoni/dvote-relay/db"
@ -19,6 +21,7 @@ var err error
var batchTimer *time.Ticker var batchTimer *time.Ticker
var batchSignal chan bool var batchSignal chan bool
var signal bool var signal bool
var transportType net.TransportID
func main() { func main() {
@ -31,16 +34,26 @@ func main() {
batch.Setup(db) batch.Setup(db)
//gather transport type flag
var transportIDString string
flag.StringVar(&transportIDString, "transport", "PubSub", "Transport must be one of: PubSub, HTTP")
flag.Parse()
transportType = net.TransportIDFromString(transportIDString)
batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds)) batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds))
batchSignal = make(chan bool) batchSignal = make(chan bool)
batch.BatchSignal = batchSignal batch.BatchSignal = batchSignal
batch.BatchSize = batchSize batch.BatchSize = batchSize
topic := "vocdoni_pubsub_testing"
fmt.Println("Entering main loop") fmt.Println("Entering main loop")
go net.Sub(topic)
transport, err := net.Init(transportType)
if err != nil {
os.Exit(1)
}
go transport.Listen()
for { for {
select { select {
case <- batchTimer.C: case <- batchTimer.C:

+ 3
- 21
data/data.go

@ -9,8 +9,9 @@ import (
shell "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api"
) )
type Record struct {
Shell *shell.Message
type Storage interface {
Publish(o []byte) string
Retrieve(id string) []byte
} }
func Publish(object []byte) string { func Publish(object []byte) string {
@ -46,22 +47,3 @@ func Retrieve(hash string) []byte {
} }
return content return content
} }
func PsSubscribe(topic string) *shell.PubSubSubscription {
sh := shell.NewShell("localhost:5001")
sub, err := sh.PubSubSubscribe(topic)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
return sub
}
func PsPublish(topic, data string) error {
sh := shell.NewShell("localhost:5001")
err := sh.PubSubPublish(topic, data)
if err != nil {
return err
}
return nil
}

+ 81
- 0
net/http.go

@ -0,0 +1,81 @@
package net
import (
"io"
"fmt"
"net/http"
"encoding/json"
"strings"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/types"
)
type HttpHandle struct {
port string
path string
}
func (h *HttpHandle) Init(c string) error {
//split c to port and path
cs := strings.Split(c, "/")
h.port = cs[0]
h.path = cs[1]
return nil
}
func parse(rw http.ResponseWriter, request *http.Request) {
decoder := json.NewDecoder(request.Body)
var e types.Envelope
var b types.Ballot
err := decoder.Decode(&e)
if err != nil {
panic(err)
}
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
panic(err)
}
//check PoW
//check key
//decrypt
//check franchise
//construct packet
//this should should be randomized, or actually taken from input
//b.PID = "1"
//b.Nullifier = []byte{1,2,3}
//b.Vote = []byte{4,5,6}
//b.Franchise = []byte{7,8,9}
err = batch.Add(b)
if err != nil {
panic(err)
}
j, err := json.Marshal(e)
if err != nil {
panic(err)
}
io.WriteString(rw, string(j))
}
func (h *HttpHandle) Listen() error {
http.HandleFunc(h.path, parse)
//add waitgroup
func() {
fmt.Println("serving on " + h.port + "/" + h.path)
err := http.ListenAndServe(":" + h.port, nil)
if err != nil {
return
}
}()
return nil
}

+ 32
- 34
net/net.go

@ -1,45 +1,43 @@
package net package net
import ( import (
"encoding/json"
"fmt"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/data"
"github.com/vocdoni/dvote-relay/types"
"errors"
) )
func Sub(topic string) error {
subscription := data.PsSubscribe(topic)
fmt.Println("Subscribed > " + topic)
var msg data.Record
var err error
for {
msg.Shell, err = subscription.Next()
if err != nil {
return err
}
payload := msg.Shell.Data
var e types.Envelope
var b types.Ballot
type Transport interface {
Listen() error
Init(c string) error
}
err = json.Unmarshal(payload, &e)
if err != nil {
return err
}
type TransportID int
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
return err
}
const (
HTTP TransportID = iota + 1
PubSub
)
err = batch.Add(b)
if err != nil {
return err
}
func TransportIDFromString(i string) TransportID {
switch i {
case "PubSub" :
return PubSub
case "HTTP":
return HTTP
default:
return -1
}
}
fmt.Println("Got > " + string(payload))
func Init(t TransportID) (Transport, error) {
switch t {
case PubSub :
p := new(PubSubHandle)
p.Init("vocdoni_pubsub_testing")
return p, nil
case HTTP :
h := new(HttpHandle)
h.Init("8080/submit")
return h, nil
default:
return nil, errors.New("Bad transport type specification")
} }
} }

+ 79
- 0
net/pubsub.go

@ -0,0 +1,79 @@
package net
import (
"os"
"fmt"
"encoding/json"
shell "github.com/ipfs/go-ipfs-api"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/types"
)
type PubSubHandle struct {
topic string
subscription *shell.PubSubSubscription
}
func PsSubscribe(topic string) *shell.PubSubSubscription {
sh := shell.NewShell("localhost:5001")
sub, err := sh.PubSubSubscribe(topic)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
return sub
}
func PsPublish(topic, data string) error {
sh := shell.NewShell("localhost:5001")
err := sh.PubSubPublish(topic, data)
if err != nil {
return err
}
return nil
}
func (p *PubSubHandle) Init(topic string) error {
p.topic = topic
p.subscription = PsSubscribe(p.topic)
return nil
}
func (p *PubSubHandle) Listen() error {
var msg *shell.Message
var err error
for {
msg, err = p.subscription.Next()
if err != nil {
fmt.Fprintf(os.Stderr, "recieve error: %s", err)
return err
}
payload := msg.Data
var e types.Envelope
var b types.Ballot
err = json.Unmarshal(payload, &e)
if err != nil {
return err
}
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
return err
}
err = batch.Add(b)
if err != nil {
return err
}
fmt.Println("Got > " + string(payload))
}
}
func (p *PubSubHandle) Send(data string) error {
return PsPublish(p.topic, data)
}

Loading…
Cancel
Save