Browse Source

initial attempt at modular transports, fails with segfault

feature_modular_transports
imw 5 years ago
parent
commit
5ce8bb33ee
5 changed files with 210 additions and 57 deletions
  1. +15
    -2
      cmd/relay/relay.go
  2. +3
    -21
      data/data.go
  3. +81
    -0
      net/http.go
  4. +32
    -34
      net/net.go
  5. +79
    -0
      net/pubsub.go

+ 15
- 2
cmd/relay/relay.go

@ -5,6 +5,8 @@ import (
"time"
"encoding/gob"
"bytes"
"os"
"flag"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/net"
"github.com/vocdoni/dvote-relay/db"
@ -19,6 +21,7 @@ var err error
var batchTimer *time.Ticker
var batchSignal chan bool
var signal bool
var transportType net.TransportID
func main() {
@ -31,16 +34,26 @@ func main() {
batch.Setup(db)
//gather transport type flag
var transportIDString string
flag.StringVar(&transportIDString, "transport", "PubSub", "Transport must be one of: PubSub, HTTP")
flag.Parse()
transportType = net.TransportIDFromString(transportIDString)
batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds))
batchSignal = make(chan bool)
batch.BatchSignal = batchSignal
batch.BatchSize = batchSize
topic := "vocdoni_pubsub_testing"
fmt.Println("Entering main loop")
go net.Sub(topic)
transport, err := net.Init(transportType)
if err != nil {
os.Exit(1)
}
go transport.Listen()
for {
select {
case <- batchTimer.C:

+ 3
- 21
data/data.go

@ -9,8 +9,9 @@ import (
shell "github.com/ipfs/go-ipfs-api"
)
type Record struct {
Shell *shell.Message
type Storage interface {
Publish(o []byte) string
Retrieve(id string) []byte
}
func Publish(object []byte) string {
@ -46,22 +47,3 @@ func Retrieve(hash string) []byte {
}
return content
}
func PsSubscribe(topic string) *shell.PubSubSubscription {
sh := shell.NewShell("localhost:5001")
sub, err := sh.PubSubSubscribe(topic)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
return sub
}
func PsPublish(topic, data string) error {
sh := shell.NewShell("localhost:5001")
err := sh.PubSubPublish(topic, data)
if err != nil {
return err
}
return nil
}

+ 81
- 0
net/http.go

@ -0,0 +1,81 @@
package net
import (
"io"
"fmt"
"net/http"
"encoding/json"
"strings"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/types"
)
type HttpHandle struct {
port string
path string
}
func (h HttpHandle) Init(c string) error {
//split c to port and path
cs := strings.Split(c, "/")
h.port = cs[0]
h.path = cs[1]
return nil
}
func parse(rw http.ResponseWriter, request *http.Request) {
decoder := json.NewDecoder(request.Body)
var e types.Envelope
var b types.Ballot
err := decoder.Decode(&e)
if err != nil {
panic(err)
}
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
panic(err)
}
//check PoW
//check key
//decrypt
//check franchise
//construct packet
//this should should be randomized, or actually taken from input
//b.PID = "1"
//b.Nullifier = []byte{1,2,3}
//b.Vote = []byte{4,5,6}
//b.Franchise = []byte{7,8,9}
err = batch.Add(b)
if err != nil {
panic(err)
}
j, err := json.Marshal(e)
if err != nil {
panic(err)
}
io.WriteString(rw, string(j))
}
func (h HttpHandle) Listen() error {
http.HandleFunc(h.path, parse)
//add waitgroup
func() {
fmt.Println("serving on " + h.port)
err := http.ListenAndServe(":" + h.port, nil)
if err != nil {
return
}
}()
return nil
}

+ 32
- 34
net/net.go

@ -1,45 +1,43 @@
package net
import (
"encoding/json"
"fmt"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/data"
"github.com/vocdoni/dvote-relay/types"
"errors"
)
func Sub(topic string) error {
subscription := data.PsSubscribe(topic)
fmt.Println("Subscribed > " + topic)
var msg data.Record
var err error
for {
msg.Shell, err = subscription.Next()
if err != nil {
return err
}
payload := msg.Shell.Data
var e types.Envelope
var b types.Ballot
type Transport interface {
Init(c string) error
Listen() error
}
err = json.Unmarshal(payload, &e)
if err != nil {
return err
}
type TransportID int
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
return err
}
const (
HTTP TransportID = iota + 1
PubSub
)
err = batch.Add(b)
if err != nil {
return err
}
func TransportIDFromString(i string) TransportID {
switch i {
case "PubSub" :
return PubSub
case "HTTP":
return HTTP
default:
return -1
}
}
fmt.Println("Got > " + string(payload))
func Init(t TransportID) (Transport, error) {
switch t {
case PubSub :
var p PubSubHandle
p.Init("vocdoni_pubsub_testing")
return p, nil
case HTTP :
var h HttpHandle
h.Init("8080/submit")
return h, nil
default:
return nil, errors.New("Bad transport type specification")
}
}

+ 79
- 0
net/pubsub.go

@ -0,0 +1,79 @@
package net
import (
"os"
"fmt"
"encoding/json"
shell "github.com/ipfs/go-ipfs-api"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/types"
)
type PubSubHandle struct {
topic string
subscription *shell.PubSubSubscription
}
func PsSubscribe(topic string) *shell.PubSubSubscription {
sh := shell.NewShell("localhost:5001")
sub, err := sh.PubSubSubscribe(topic)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
return sub
}
func PsPublish(topic, data string) error {
sh := shell.NewShell("localhost:5001")
err := sh.PubSubPublish(topic, data)
if err != nil {
return err
}
return nil
}
func (p PubSubHandle) Init(topic string) error {
p.topic = topic
p.subscription = PsSubscribe(p.topic)
fmt.Println("Subscribed > " + p.topic)
return nil
}
func (p PubSubHandle) Listen() error {
var msg *shell.Message
var err error
for {
msg, err = p.subscription.Next()
if err != nil {
return err
}
payload := msg.Data
var e types.Envelope
var b types.Ballot
err = json.Unmarshal(payload, &e)
if err != nil {
return err
}
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
return err
}
err = batch.Add(b)
if err != nil {
return err
}
fmt.Println("Got > " + string(payload))
}
}
func (p PubSubHandle) Send(data string) error {
return PsPublish(p.topic, data)
}

Loading…
Cancel
Save