Browse Source

Merge pull request #15 from vocdoni/batching_initial_approach

All work so far
pubsub_testing
imw 5 years ago
committed by GitHub
parent
commit
d8e9ade40a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 620 additions and 0 deletions
  1. +3
    -0
      README.md
  2. +84
    -0
      batch/batch.go
  3. +1
    -0
      batch/batch_test.go
  4. +63
    -0
      data/data.go
  5. +34
    -0
      data/data_test.go
  6. +113
    -0
      db/leveldb.go
  7. +104
    -0
      generator/generator.go
  8. +71
    -0
      main.go
  9. +64
    -0
      net/net.go
  10. +52
    -0
      net/net_test.go
  11. +31
    -0
      types/types.go

+ 3
- 0
README.md

@ -1,3 +1,6 @@
# votingRelay
dVote library for Relay
With application running, you can submit fake votes with a json request of the form:
curl -H 'Content-Type: application/json'e":"package","Nonce":"bm9uY2U=","KeyProof":"cHJvb2Y=","Package":"dm90ZXBhY2thZ2U=","Timestamp":"2018-12-14T15:04:05Z"}' http://localhost:8080/submit

+ 84
- 0
batch/batch.go

@ -0,0 +1,84 @@
package batch
import (
"fmt"
"github.com/vocdoni/dvote-relay/types"
"github.com/vocdoni/dvote-relay/db"
"encoding/json"
)
var rdb *db.LevelDbStorage
var bdb *db.LevelDbStorage
var BatchSignal chan bool
var BatchSize int
var err error
func Setup(l *db.LevelDbStorage) {
rdb = l.WithPrefix([]byte("relay_"))
bdb = l.WithPrefix([]byte("batch_"))
}
//add (queue for counting)
func Add(ballot types.Ballot) error {
//this is probably adding []
//err := bdb.Put(fmt.Sprintf("%v", p.Nullifier)),[]byte(fmt.Sprintf("%v", p)))
var b []byte
var n []byte
var err error
b, err = json.Marshal(ballot)
n, err = json.Marshal(ballot.Nullifier)
err = bdb.Put(n,b)
if err != nil {
return err
}
//this actually needs to see if it was added
if bdb.Count() >= BatchSize {
BatchSignal <- true
}
return nil
}
//create (return batch)
//k is []byte 'batch_' + nullifier
//v is []byte package
//returns slice of nullifiers, batch json
func Fetch() ([]string, []string) {
var n []string
var b []string
iter := bdb.Iter()
for iter.Next() {
k := iter.Key()
v := iter.Value()
err := iter.Error()
if err != nil {
panic(err)
}
n = append(n, string(k[6:]))
b = append(b, string(v))
}
iter.Release()
// jn, err := json.Marshal(n)
// if err != nil {
// panic(err)
// }
// jb, err := json.Marshal(b)
// if err != nil {
// panic(err)
// }
return n, b
}
//move from bdb to rdb once pinned
func Compact(n []string) {
for _, k := range n {
//fmt.Println(k)
v, err := bdb.Get([]byte(k))
if err != nil {
fmt.Println(err.Error())
}
rdb.Put([]byte(k), v)
bdb.Delete([]byte(k))
}
}

+ 1
- 0
batch/batch_test.go

@ -0,0 +1 @@
package batch

+ 63
- 0
data/data.go

@ -0,0 +1,63 @@
package data
import (
"os"
"fmt"
"bytes"
"io/ioutil"
shell "github.com/ipfs/go-ipfs-api"
)
func Publish(object []byte) string {
sh := shell.NewShell("localhost:5001")
cid, err := sh.Add(bytes.NewBuffer(object))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
return cid
}
func Pin(path string) {
sh := shell.NewShell("localhost:5001")
err := sh.Pin(path)
if err != nil{
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
}
func Retrieve(hash string) []byte {
sh := shell.NewShell("localhost:5001")
reader, err := sh.Cat(hash)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
content, err := ioutil.ReadAll(reader)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
return content
}
func PsSubscribe(topic string) *shell.PubSubSubscription {
sh := shell.NewShell("localhost:5001")
sub, err := sh.PubSubSubscribe(topic)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
return sub
}
func PsPublish(topic, data string) {
sh := shell.NewShell("localhost:5001")
err := sh.PubSubPublish(topic, data)
if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err)
os.Exit(1)
}
}

+ 34
- 0
data/data_test.go

@ -0,0 +1,34 @@
package data
import (
"testing"
"fmt"
"encoding/json"
"strings"
)
func TestPublishAndRetrieve(t *testing.T) {
t.Log("Testing adding json")
exampleVote := votePacket{
000001,
"12309801002",
"nynnynnnynnnyy",
"132498-0-02103908",
}
testObject, err := json.Marshal(exampleVote)
if err != nil {
t.Errorf("Bad test JSON: %s", err)
}
prepub := string(testObject)
hash := publish(testObject)
content := retrieve(hash)
postpub := string(content)
//fmt.Println(hash)
//fmt.Println(string(content))
if strings.Compare(prepub,postpub) != 0 {
t.Errorf("Published file doesn't match. Expected:\n %s \n Got: \n %s \n", prepub, postpub)
}
}

+ 113
- 0
db/leveldb.go

@ -0,0 +1,113 @@
package db
// modified from https://github.com/iden3/go-iden3/blob/master/db/leveldb.go
import (
"encoding/json"
"github.com/syndtr/goleveldb/leveldb"
// "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
type LevelDbStorage struct {
ldb *leveldb.DB
prefix []byte
}
func NewLevelDbStorage(path string, errorIfMissing bool) (*LevelDbStorage, error) {
o := &opt.Options{
ErrorIfMissing: errorIfMissing,
}
ldb, err := leveldb.OpenFile(path, o)
if err != nil {
return nil, err
}
return &LevelDbStorage{ldb, []byte{}}, nil
}
type storageInfo struct {
KeyCount int
}
func (l *LevelDbStorage) Count() int {
keycount := 0
db := l.ldb
iter := db.NewIterator(util.BytesPrefix(l.prefix), nil)
for iter.Next() {
keycount++
}
iter.Release()
if err := iter.Error(); err != nil {
panic(err)
}
return keycount
}
func (l *LevelDbStorage) Info() string {
keycount := 0
db := l.ldb
iter := db.NewIterator(util.BytesPrefix(l.prefix), nil)
for iter.Next() {
keycount++
}
iter.Release()
if err := iter.Error(); err != nil {
return err.Error()
}
json, _ := json.MarshalIndent(
storageInfo{keycount},
"", " ",
)
return string(json)
}
func (l *LevelDbStorage) WithPrefix(prefix []byte) *LevelDbStorage {
return &LevelDbStorage{l.ldb, append(l.prefix, prefix...)}
}
func (l *LevelDbStorage) Get(key []byte) ([]byte, error) {
v, err := l.ldb.Get(append(l.prefix, key[:]...), nil)
if err != nil {
return nil, err
}
return v, err
}
func (l *LevelDbStorage) Put(key []byte, value []byte) error {
err := l.ldb.Put(append(l.prefix, key[:]...), value, nil)
if err != nil {
return err
}
return nil
}
func (l *LevelDbStorage) Delete(key []byte) error {
err := l.ldb.Delete(append(l.prefix, key[:]...), nil)
if err != nil {
return err
}
return nil
}
func (l *LevelDbStorage) Close() {
if err := l.ldb.Close(); err != nil {
panic(err)
}
}
func (l *LevelDbStorage) Iter() iterator.Iterator {
db := l.ldb
i := db.NewIterator(util.BytesPrefix(l.prefix), nil)
return i
}
func (l *LevelDbStorage) LevelDB() *leveldb.DB {
return l.ldb
}

+ 104
- 0
generator/generator.go

@ -0,0 +1,104 @@
package main
import (
"os"
"strconv"
"time"
"fmt"
"math/rand"
"encoding/json"
"encoding/base64"
"net/http"
"bytes"
"io/ioutil"
"github.com/vocdoni/dvote-relay/types"
)
func makeBallot() string {
var bal types.Ballot
bal.Type = "ballot0"
pidBytes := make([]byte, 32)
rand.Read(pidBytes)
bal.PID = base64.StdEncoding.EncodeToString(pidBytes)
nullifier := make([]byte, 32)
rand.Read(nullifier)
bal.Nullifier = nullifier
vote := make([]byte, 32)
rand.Read(vote)
bal.Vote = vote
franchise := make([]byte, 32)
rand.Read(franchise)
bal.Franchise = franchise
b, err := json.Marshal(bal)
if err != nil {
fmt.Println(err)
return "error"
}
//todo: add encryption, pow
return string(b)
}
func makeEnvelope(ballot string) string {
var env types.Envelope
env.Type = "envelope0"
env.Nonce = rand.Uint64()
kp := make([]byte, 4)
rand.Read(kp)
env.KeyProof = kp
env.Ballot = []byte(ballot)
env.Timestamp = time.Now()
e, err := json.Marshal(env)
if err != nil {
fmt.Println(err)
return "error"
}
//todo: add encryption, pow
return string(e)
}
func main() {
interval := os.Args[1]
i, _ := strconv.Atoi(interval)
timer := time.NewTicker(time.Millisecond * time.Duration(i))
rand.Seed(time.Now().UnixNano())
url := "http://localhost:8090/submit"
fmt.Println("URL:>", url)
for {
select {
case <- timer.C:
fmt.Println(makeEnvelope(makeBallot()))
var jsonStr = []byte(makeEnvelope(makeBallot()))
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
req.Header.Set("X-Custom-Header", "myvalue")
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
fmt.Println("response Status:", resp.Status)
fmt.Println("response Headers:", resp.Header)
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println("response Body:", string(body))
default:
continue
}
}
}

+ 71
- 0
main.go

@ -0,0 +1,71 @@
package main
import (
"fmt"
"time"
"encoding/gob"
"bytes"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/net"
"github.com/vocdoni/dvote-relay/db"
"github.com/vocdoni/dvote-relay/data"
)
var dbPath = "~/.dvote/relay.db"
var batchSeconds = 10 //seconds
var batchSize = 3 //packets
var err error
var batchTimer *time.Ticker
var batchSignal chan bool
var signal bool
func main() {
db, err := db.NewLevelDbStorage(dbPath, false)
if err != nil {
panic(err)
}
defer db.Close()
batch.Setup(db)
batchTimer = time.NewTicker(time.Second * time.Duration(batchSeconds))
batchSignal = make(chan bool)
batch.BatchSignal = batchSignal
batch.BatchSize = batchSize
fmt.Println("Entering main loop")
go net.Listen("8090")
for {
select {
case <- batchTimer.C:
fmt.Println("Timer triggered")
// fmt.Println(batch.Create())
//replace with chain link
case signal := <-batchSignal:
if signal == true {
fmt.Println("Signal triggered")
ns, bs := batch.Fetch()
buf := &bytes.Buffer{}
gob.NewEncoder(buf).Encode(bs)
bb := buf.Bytes()
cid := data.Publish(bb)
data.Pin(cid)
fmt.Printf("Batch published at: %s \n", cid)
// add to ipfs
// add to chain
// announce to pubsub
//fmt.Println("Nullifiers:")
//fmt.Println(n)
//fmt.Println("Batch:")
//fmt.Println(b)
batch.Compact(ns)
}
default:
continue
}
}
}

+ 64
- 0
net/net.go

@ -0,0 +1,64 @@
package net
import (
"encoding/json"
"fmt"
"net/http"
"io"
"github.com/vocdoni/dvote-relay/batch"
"github.com/vocdoni/dvote-relay/types"
)
func parse(rw http.ResponseWriter, request *http.Request) {
decoder := json.NewDecoder(request.Body)
var e types.Envelope
var b types.Ballot
err := decoder.Decode(&e)
if err != nil {
panic(err)
}
err = json.Unmarshal(e.Ballot, &b)
if err != nil {
panic(err)
}
//check PoW
//check key
//decrypt
//check franchise
//construct packet
//this should should be randomized, or actually taken from input
//b.PID = "1"
//b.Nullifier = []byte{1,2,3}
//b.Vote = []byte{4,5,6}
//b.Franchise = []byte{7,8,9}
err = batch.Add(b)
if err != nil {
panic(err)
}
j, err := json.Marshal(e)
if err != nil {
panic(err)
}
io.WriteString(rw, string(j))
}
func Listen(port string) {
http.HandleFunc("/submit", parse)
//add waitgroup
func() {
fmt.Println("serving on " + port)
err := http.ListenAndServe(":" + port, nil)
if err != nil {
panic("ListenAndServe: " + err.Error())
}
}()
}

+ 52
- 0
net/net_test.go

@ -0,0 +1,52 @@
package net
import (
"testing"
"encoding/json"
"net/http"
"fmt"
"time"
"bytes"
"io/ioutil"
)
//func generateSubmission() submission {
//}
func TestListen(t *testing.T) {
t.Log("Testing listener")
testSubmission := submission {
"package",
[]byte("012345678"),
[]byte("012345678"),
[]byte("012345678"),
time.Now(),
}
listen("8080")
url := "http://localhost:8080/submit"
fmt.Println("URL:>", url)
j, err := json.Marshal(testSubmission)
if err != nil {
t.Errorf("Bad test JSON: %s", err)
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(j))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
t.Errorf("Error in client: %s", err)
}
defer resp.Body.Close()
fmt.Println("response Status:", resp.Status)
fmt.Println("response Headers:", resp.Header)
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println("response Body:", string(body))
}

+ 31
- 0
types/types.go

@ -0,0 +1,31 @@
package types
import (
"time"
)
type Ballot struct {
Type string
PID string
Nullifier []byte
Vote []byte
Franchise []byte
}
type Envelope struct {
Type string
Nonce uint64
KeyProof []byte
Ballot []byte
Timestamp time.Time
}
type Batch struct {
Type string
Nullifiers []string
URL string
TXID string
Nonce []byte
Signature string
}

Loading…
Cancel
Save