|
|
// Copyright 2017-2018 DERO Project. All rights reserved.
// Use of this source code in any form is governed by RESEARCH license.
// license can be found in the LICENSE file.
// GPG: 0F39 E425 8C65 3947 702A 8234 08B2 0360 A03A 9DE8
//
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package p2pv2
import "io" import "net"
//import "sync"
import "time" import "runtime/debug" import "container/list" import "encoding/binary"
import log "github.com/sirupsen/logrus" import "github.com/romana/rlog" import "github.com/vmihailenco/msgpack"
// this function waits for any commands from the connection and suitably responds doing sanity checks
// this is the core of the p2p package
/* this is the entire connection handler, all incoming/outgoing connections end up here */ func Handle_Connection(conn net.Conn, remote_addr *net.TCPAddr, incoming bool) {
var connection Connection connection.Incoming = incoming connection.Conn = conn var idle int
connection.Addr = remote_addr // since we may be connecting via socks, get target IP
connection.Command_queue = list.New() // init command queue
connection.State = HANDSHAKE_PENDING if incoming { connection.logger = logger.WithFields(log.Fields{"RIP": remote_addr.String(), "DIR": "INC", "V2": "1"}) } else { connection.logger = logger.WithFields(log.Fields{"RIP": remote_addr.String(), "DIR": "OUT", "V2": "1"}) }
defer func() { if r := recover(); r != nil { connection.logger.Warnf("Recovered while handling connection, Stack trace below", r) connection.logger.Warnf("Stack trace \n%s", debug.Stack())
} }()
Connection_Add(&connection) // add connection to pool
if !incoming { // we initiated the connection, we must send the handshake first
connection.Send_Handshake_Command() // send handshake
}
// goroutine to exit the connection if signalled
go func() { ticker := time.NewTicker(1 * time.Second) // 1 second ticker
for { select { case <-ticker.C: idle++ // if idle more than 13 secs, we should send a timed sync
if idle > 13 { if connection.State != HANDSHAKE_PENDING { connection.State = IDLE } //Send_Timed_Sync(&connection)
//connection.logger.Debugf("We should send a timed sync")
idle = 0 } case <-Exit_Event: // p2p is shutting down, close the connection
connection.Exit = true ticker.Stop() // release resources of timer
Connection_Delete(&connection) conn.Close() return // close the connection and close the routine
} if connection.Exit { // release resources of timer
ticker.Stop() Connection_Delete(&connection) conn.Close() return } } }()
// the infinite loop handler
for { length_buf := make([]byte, 4, 4) // prefix length is 4 bytes
if connection.Exit { break }
read_count, err := io.ReadFull(connection.Conn, length_buf) if err != nil { rlog.Tracef(2, "Error while reading command prefix length exiting err:%s\n", err) connection.Exit = true continue }
length := binary.LittleEndian.Uint32(length_buf) // convert little endian bytes 4 bytes to length
// check safety of length, we should not allocate more than 100 MB as that is the limit of the block
command_buf := make([]byte, length, length) //set_timeout(&connection) // we should not hang for hrs waiting for data to come
read_count, err = io.ReadFull(connection.Conn, command_buf) if err != nil { rlog.Tracef(2, "Error while reading command data exiting err:%s\n", err) connection.Exit = true continue }
command_buf = command_buf[:read_count] var dummy_command Common err = msgpack.Unmarshal(command_buf, &dummy_command) if err != nil { rlog.Tracef(2, "Error while parsing command data exiting err:%s\n", err) connection.Exit = true continue }
// if handshake not done, donot process any command
if !connection.HandShakeCompleted && dummy_command.Command != V2_COMMAND_HANDSHAKE { rlog.Tracef(2, "Peer Sending something but we are waiting for handshake command data exiting err:%s\n", err) connection.Exit = true continue }
switch dummy_command.Command { case V2_COMMAND_HANDSHAKE: var handshake Handshake err = msgpack.Unmarshal(command_buf, &handshake) if err != nil { rlog.Tracef(2, "Error while parsing incoming handshake data exiting err:%s\n", err) connection.Exit = true continue }
case V2_COMMAND_SYNC:
case V2_COMMAND_CHAIN_REQUEST:
case V2_COMMAND_CHAIN_RESPONSE: // this should be verified whether we are waiting for it
case V2_COMMAND_OBJECTS_REQUEST:
case V2_COMMAND_OBJECTS_RESPONSE: // this should be verified whether we are waiting for it
case V2_NOTIFY_NEW_BLOCK:
case V2_NOTIFY_NEW_TX:
}
}
}
|