You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
5.4 KiB

// Copyright 2017-2018 DERO Project. All rights reserved.
// Use of this source code in any form is governed by RESEARCH license.
// license can be found in the LICENSE file.
// GPG: 0F39 E425 8C65 3947 702A 8234 08B2 0360 A03A 9DE8
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package p2pv2
import "io"
import "net"
//import "sync"
import "time"
import "runtime/debug"
import "container/list"
import "encoding/binary"
import log "github.com/sirupsen/logrus"
import "github.com/romana/rlog"
import "github.com/vmihailenco/msgpack"
// this function waits for any commands from the connection and suitably responds doing sanity checks
// this is the core of the p2p package
/* this is the entire connection handler, all incoming/outgoing connections end up here */
func Handle_Connection(conn net.Conn, remote_addr *net.TCPAddr, incoming bool) {
var connection Connection
connection.Incoming = incoming
connection.Conn = conn
var idle int
connection.Addr = remote_addr // since we may be connecting via socks, get target IP
connection.Command_queue = list.New() // init command queue
connection.State = HANDSHAKE_PENDING
if incoming {
connection.logger = logger.WithFields(log.Fields{"RIP": remote_addr.String(), "DIR": "INC", "V2": "1"})
} else {
connection.logger = logger.WithFields(log.Fields{"RIP": remote_addr.String(), "DIR": "OUT", "V2": "1"})
}
defer func() {
if r := recover(); r != nil {
connection.logger.Warnf("Recovered while handling connection, Stack trace below", r)
connection.logger.Warnf("Stack trace \n%s", debug.Stack())
}
}()
Connection_Add(&connection) // add connection to pool
if !incoming { // we initiated the connection, we must send the handshake first
connection.Send_Handshake_Command() // send handshake
}
// goroutine to exit the connection if signalled
go func() {
ticker := time.NewTicker(1 * time.Second) // 1 second ticker
for {
select {
case <-ticker.C:
idle++
// if idle more than 13 secs, we should send a timed sync
if idle > 13 {
if connection.State != HANDSHAKE_PENDING {
connection.State = IDLE
}
//Send_Timed_Sync(&connection)
//connection.logger.Debugf("We should send a timed sync")
idle = 0
}
case <-Exit_Event: // p2p is shutting down, close the connection
connection.Exit = true
ticker.Stop() // release resources of timer
Connection_Delete(&connection)
conn.Close()
return // close the connection and close the routine
}
if connection.Exit { // release resources of timer
ticker.Stop()
Connection_Delete(&connection)
conn.Close()
return
}
}
}()
// the infinite loop handler
for {
length_buf := make([]byte, 4, 4) // prefix length is 4 bytes
if connection.Exit {
break
}
read_count, err := io.ReadFull(connection.Conn, length_buf)
if err != nil {
rlog.Tracef(2, "Error while reading command prefix length exiting err:%s\n", err)
connection.Exit = true
continue
}
length := binary.LittleEndian.Uint32(length_buf) // convert little endian bytes 4 bytes to length
// check safety of length, we should not allocate more than 100 MB as that is the limit of the block
command_buf := make([]byte, length, length)
//set_timeout(&connection) // we should not hang for hrs waiting for data to come
read_count, err = io.ReadFull(connection.Conn, command_buf)
if err != nil {
rlog.Tracef(2, "Error while reading command data exiting err:%s\n", err)
connection.Exit = true
continue
}
command_buf = command_buf[:read_count]
var dummy_command Common
err = msgpack.Unmarshal(command_buf, &dummy_command)
if err != nil {
rlog.Tracef(2, "Error while parsing command data exiting err:%s\n", err)
connection.Exit = true
continue
}
// if handshake not done, donot process any command
if !connection.HandShakeCompleted && dummy_command.Command != V2_COMMAND_HANDSHAKE {
rlog.Tracef(2, "Peer Sending something but we are waiting for handshake command data exiting err:%s\n", err)
connection.Exit = true
continue
}
switch dummy_command.Command {
case V2_COMMAND_HANDSHAKE:
var handshake Handshake
err = msgpack.Unmarshal(command_buf, &handshake)
if err != nil {
rlog.Tracef(2, "Error while parsing incoming handshake data exiting err:%s\n", err)
connection.Exit = true
continue
}
case V2_COMMAND_SYNC:
case V2_COMMAND_CHAIN_REQUEST:
case V2_COMMAND_CHAIN_RESPONSE: // this should be verified whether we are waiting for it
case V2_COMMAND_OBJECTS_REQUEST:
case V2_COMMAND_OBJECTS_RESPONSE: // this should be verified whether we are waiting for it
case V2_NOTIFY_NEW_BLOCK:
case V2_NOTIFY_NEW_TX:
}
}
}