Implementing Peer-to-Peer Apps - Sun, Nov 10, 2024
How to implement a peer-to-peer application
Implementing peer-to-peer applications is one of the most complex parts of building a distributed application. Most of the time, available solutions are either obscure or actually centralized implementations inaccurately marketed as peer-to-peer. (Central servers create a single point of failure, which renders the peer-to-peer network useless.)
That’s what led me to implement my own peer-to-peer software from scratch. So let’s go down the rabbit hole :)
The goal of this research is to create an app that fulfills the following properties:
- Decentralization: In a P2P network, there are no central servers. Instead, all nodes (peers) act as both clients and servers, serving and requesting information.
- Scalability: The P2P network must have many nodes to maintain decentralization and prevent single points of failure. It should scale accordingly to handle high traffic.
- Fault Tolerance: The network should be trustless, meaning a failure in one entity should not harm the rest of the network and its operations.
- Autonomy: Each node must be self-organizing and function independently without relying on third parties.
- Peer Discovery: Nodes should be able to discover other nodes to stay updated with the rest of the network.
How Does It Work?
A peer-to-peer (P2P) architecture refers to a decentralized solution without clients or servers; instead, each node acts as both a server and a client. The two most popular types of P2P implementations are:
- Structured P2P networks: Nodes operate in a structured manner, managed through a predetermined decentralized routing table that facilitates communication.
- Unstructured P2P networks: In contrast to structured P2P networks, unstructured networks have no restrictions on topology or communication, forming a mesh with random peer connections. Although less efficient, this free-form network offers more resilience and decentralization.
In this article, we will focus on unstructured P2P networks, as that is our concern.
Peer Discovery
Peers need a mechanism for discovering each other to maintain decentralization and stay updated with the network. One approach would be to simply scan the internet for peers on a specific port and attempt to initiate a connection. While this may seem ideal and decentralized, it would be inefficient to scan the entire internet, and some firewalls might block servers sending high volumes of traffic as it could be mistaken for malicious activity.
Another way is through bootstrap nodes that act as trusted seed nodes, tracking all connected peers in the network. This implementation is fast but centralized, requiring trust in the bootstrap node operators to remain available. Bitcoin and Ethereum blockchains operate this way but with multiple bootstrap nodes located in various geographic areas and maintained by different entities.
Bitcoin used to use a global communication channel where nodes advertised themselves to the network, allowing others to connect. Bitcoin nodes initially used the #bitcoin
Freenode IRC channel, but this approach depends on IRC’s availability, which creates a single point of failure.
Technologies
I started by researching current solutions and examining what most people are using.
Currently, the most widespread solution for peer-to-peer communication is libp2p, a standard library implemented in many popular programming languages (Rust, Go, JavaScript, etc.). It is also used in Geth and Substrate, making it an attractive solution.
However, in my opinion, libp2p is an over-engineered solution, suited for blockchain but not for our purpose, which requires lightweight microservices with minimal processing. So, we decided to write our own P2P implementation in Go.
The first implementation involved creating socket listeners and connecting to them to send and receive data. Later, as more functionality was needed, the basic sockets implementation became messy and challenging to manage.
At this point, I looked into other implementations that are both minimal and scalable, and I found gRPC.
gRPC extends the protobuf IDL language for data serialization, which suits our needs since it is extensible and enforces strict data definitions. It also enables us to compile our data structures into other languages, which is helpful if we need to switch codebases.
A basic protobuf looks like this:
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings.
message HelloReply {
string message = 1;
}
We can use the protobuf compiler to generate Go code for data parsing and serialization:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
helloworld/helloworld.proto
``
Then we implement the method like this:
```golang
func (s *server) SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
}
If we want to run peers in other languages, the process would be similar.
Another excellent feature of gRPC is bidirectional streaming, though it is beyond the scope of this article.
Let’s Write the Code
With the prerequisites in place, let’s start coding. First, we define a protobuf that specifies how our protocol will operate.
The following is a protobuf definition for a flooding P2P network where nodes broadcast messages to all connected nodes.
syntax = "proto3";
package peer2peer.node;
option go_package = "peer2peer/proto";
service Node {
rpc connect(ConnectRequest) returns (ConnectResponse); // Connect to other nodes
rpc flood(FloodRequest) returns (FloodResponse); // Broadcast the message to all connected nodes
}
message ConnectRequest {
string neighbor = 1; // Address of the connecting peer
}
message ConnectResponse {
}
message FloodRequest {
string source = 1; // Flooder's address
optional int64 ttl = 2; // Time to live
string message = 3; // Message to share
string uid = 4; // Message UID for tracking
}
message FloodResponse {
bool status = 1;
}
Now we can compile the protobuf and use it in Go.
// Maintain a list of connected nodes
type ConnectedNodes struct {
mu sync.Mutex // Mutex to protect against race conditions
Nodes map[string]bool // Map of connected nodes
}
type Server struct {
pb.UnimplementedNodeServer
Neighbors *ConnectedNodes
myAddress string
sentMessages map[string]bool // Track received messages to prevent rebroadcasting
}
func (s *Server) Connect(ctx context.Context, in *pb.ConnectRequest) (*pb.ConnectResponse, error) {
s.Neighbors.mu.Lock()
defer s.Neighbors.mu.Unlock()
_, exists := s.Neighbors.Nodes[in.Neighbor] // Check if already connected
if !exists {
log.Printf("Received a connection from %s", in.Neighbor)
s.Neighbors.Nodes[in.Neighbor] = true // Update neighbors map
}
return &pb.ConnectResponse{}, nil
}
func (s *Server) Flood(ctx context.Context, in *pb.FloodRequest) (*pb.FloodResponse, error) {
// Check if the message was already broadcasted
if s.sentMessages[in.Message] {
return &pb.FloodResponse{Status: true}, nil
}
// Check if message expired
if time.Now().After(time.Unix(in.Ttl)) {
return &pb.FloodResponse{Status: false}, nil
}
s.sentMessages[in.Message] = true
// Flood the topology with the message
for neighbor, status := range s.Neighbors.Nodes {
time.Sleep(3 * time.Second)
if status {
if in.Source == neighbor {
continue
}
log.Printf("Transmitting to %s\n", neighbor)
conn, err := grpc.Dial(neighbor, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Printf("Can't broadcast to node %s", neighbor)
continue
}
node := pb.NewNodeClient(conn)
ctx := context.Background()
_, err = node.Flood(ctx, in)
if err != nil {
return nil, err
}
}
}
return &pb.FloodResponse{Status: true}, nil
}
This implementation is sufficient for a basic network where nodes rarely go down. In more complex situations, it is necessary to run periodic health checks to maintain a list of valid nodes.
func (s *Server) HealthCheck(ctx context.Context) {
s.Neighbors.mu.Lock()
s.Neighbors.mu.Unlock()
healthyNeighbors := 0
for neighbor := range s.Neighbors.Nodes {
conn, err := grpc.Dial(neighbor, grpc.WithTransportCredentials(insecure.NewCredentials()))
// Couldn't connect to the node
if err != nil {
log.Printf("Neighbor %s is down...\n", neighbor)
delete(s.Neighbors.Nodes, neighbor) // Remove the down node
continue
}
node := pb.NewNodeClient(conn)
ctx := context.Background()
_, err = node.Connect(ctx, &pb.ConnectRequest{Neighbor: s.myAddress})
if err != nil {
log.Printf("Error connecting: %v \n", err)
delete(s.Neighbors.Nodes, neighbor) // Remove faulty node from list
continue
}
healthyNeighbors++
}
fmt.Printf("Node is connected to %d peers\n", healthyNeighbors)
}
Now, you can either fetch the seed nodes on startup and connect to those or scan the internal network for available nodes using a port scanner.
An exercise for the reader would be to use a Docker-Swarm-like solution to test the implementation over a private network with multiple subnets.