
前言
上次我們用java實作了默克樹,這次我們用java基于t-io框架實作區塊鏈中的p2p網路構建,實作通信的功能,當然,實作p2p也可以基于WebSocket!下次我們再來實作一下,區塊鏈 P2P閑聊
P2P Peer-to-Peer ,最早起源于 1997 年, otlin ommunication 公司研制了能讓用戶從別人電腦中下載內容的軟體,這便是最早 P2P 當時P2P 網路一詞 的定義也與此相關,即網路是一類允許 組用戶互相連接并直接從用戶硬碟上獲取檔案的 網路 Hotlin 曾一度成 P2P 網路的代名詞,曾有文章 以“ tlin- The Gl01y Days Of P2P 為題介紹當時的盛況隨著互聯網技術的發展 P2P 演化成了一種分布式網路 在分布式網路 ,網路的各個節點,無論是機構還是個人可以共享們所擁有的 部分軟 硬體資源如資料處理能力、資訊存盤能力、互聯網連接能力、列印機等, P2P 分布式網路這些共享資源能被其他對 節點( peer )直接訪問,無須 過中間的服務器在區塊鏈領域,不同的公鏈系統所使用的 P2P網路技術大不相同,國內迅雷發布的區塊鏈檔案系統 TCFS 是基于 IPFS 實作的,位元幣系統的 P2P 絡是無結構的,而以太坊的 P2P 絡是有結構的,在以太坊中, P2P 網路主要采用 Kademlia 演算法實作, Kademlia 是分布式散串列( Distributed Hash Table, DHT )技術的一種,借助該技術,以太坊系統實作了在分布式環境下快速準確地路由和定位資料,分布式散串列是 個由廣域網內大 節點共同維護的巨大散串列,這張散串列被分割成不連續的塊,每個節點被分配給 個屬于自己的散列塊,并成為這個散列塊的管理者, DHT 的節點是動態的,在節點中,通過加密散列函式,物件的名字或關鍵詞就可以被映射為 128 位或 160 位的散列值,目前, Tapest可、 Chord CANPas 都使用了分布式散串列技術當然,不同結構的P2P 網路,有不同的優點和缺點,位元幣系統的網路結構是最容易理解、最容易實作的種形式,而以太坊網路引入了分布式散表、異或距離、二叉前綴樹等,結構上更復雜,實作起來也更困難,
一、t-io是什么?

常見應用場景:
IM(官方提供了im例子,含web端)
實時監控
推送服務(已內置API)
RPC
游戲
物聯網(已有很多案例)
其它實時通訊型別的場景,不一一列舉
t-io是一個網路框架,從這一點來說是有點像 netty 的,但 t-io 為常見和網路相關的業務(如 IM、訊息推送、RPC、監控)提供了近乎于現成的解決方案,即豐富的編程 API,極大減少業務層的編程難度
官方地址(有興趣的可以研究一下):https://www.tiocloud.com/2/product/tio.html
提示:目前t-io的檔案已經收費!但是對于學習能力強的小伙伴看看demo就能明白,能簡單的上手,
二、使用步驟
1.引入庫
maven依賴:
<!-- https://mvnrepository.com/artifact/org.t-io/tio-core -->
<dependency>
<groupId>org.t-io</groupId>
<artifactId>tio-core</artifactId>
<version>3.7.0.v20201010-RELEASE</version>
</dependency>
2.服務端代碼

圖片來自:https://blog.csdn.net/qwerdfgg/article/details/106017467?biz_id=102&spm=1018.2118.3001.4449
代碼如下:
package org.xiangbiao.p2p.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
/**
* MyServerAioHandler
* @author larry.xiang
*/
public class MyServerAioHandler implements ServerAioHandler {
private static final Logger logger = LoggerFactory.getLogger(MyServerAioHandler.class);
public Packet decode(ByteBuffer byteBuffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
if (readableLength < ServerPacket.PACKET_HEADER_LENGTH) {
return null;
}
int bodyLength = byteBuffer.getInt();
if (bodyLength < 0) {
throw new AioDecodeException("body length[" + bodyLength + "] is invalid. romote: " + channelContext.getServerNode());
}
int len = bodyLength + ServerPacket.PACKET_HEADER_LENGTH;
if (len > readableLength) {
return null;
} else {
byte[] bytes = new byte[len];
int i = 0;
while(true){
if(byteBuffer.remaining() == 0){
break;
}
byte b = byteBuffer.get();
bytes[i++] = b;
}
ServerPacket serverPacket = new ServerPacket();
serverPacket.setBody(bytes);
return serverPacket;
}
}
public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
ServerPacket serverPacket = (ServerPacket) packet;
byte[] body = serverPacket.getBody();
int bodyLength = 0;
if(body != null){
bodyLength = body.length;
}
ByteBuffer byteBuffer = ByteBuffer.allocate(bodyLength + ServerPacket.PACKET_HEADER_LENGTH);
byteBuffer.order(groupContext.getByteOrder());
byteBuffer.putInt(bodyLength);
if (body != null) {
byteBuffer.put(body);
}
String bodyStr = null;
try {
bodyStr = new String(body, "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("bodyStr2:"+bodyStr);
return byteBuffer;
}
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
channelContext.setServerNode(new Node("127.0.0.1", ServerPacket.PORT));
ServerPacket serverPacket = (ServerPacket) packet;
byte[] body = serverPacket.getBody();
if(body != null){
String bodyStr = new String(body, "utf-8");
ServerPacket serverPacket1 = new ServerPacket();
serverPacket1.setBody(("receive from ["+ channelContext.getClientNode() + "]: " + bodyStr).getBytes("utf-8"));
Tio.send(channelContext, serverPacket1);
}
}
}
package org.xiangbiao.p2p.server;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioListener;
/**
* MyServerAioListener
* @author larry.xiang
*/
public class MyServerAioListener implements ServerAioListener {
public void onAfterConnected(ChannelContext channelContext, boolean b, boolean b1) throws Exception {
}
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int i) throws Exception {
}
public void onAfterReceivedBytes(ChannelContext channelContext, int i) throws Exception {
}
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean b) throws Exception {
}
public void onAfterHandled(ChannelContext channelContext, Packet packet, long l) throws Exception {
}
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String s, boolean b) throws Exception {
}
}
package org.xiangbiao.p2p.server;
import org.tio.core.intf.Packet;
/**
* 服務包物體
* @author larry.xiang
*/
public class ServerPacket extends Packet {
public static final Integer PACKET_HEADER_LENGTH = 4;
public static final Integer PORT = 8999;
byte[] body;//資料
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
package org.xiangbiao.p2p.server;
import org.tio.core.Tio;
import org.tio.server.ServerGroupContext;
import org.tio.server.TioServer;
import java.io.IOException;
/**
* 服務端啟動類
* @author larry.xiang
*/
public class TestServer {
public static void main(String[] args) throws IOException {
ServerGroupContext serverGroupContext = new ServerGroupContext("tio-blockchain", new MyServerAioHandler(), new MyServerAioListener());
TioServer server = new TioServer(serverGroupContext);
server.start("127.0.0.1", 8999);
}
}
3.客戶端代碼

圖片來自:https://blog.csdn.net/qwerdfgg/article/details/106017467?biz_id=102&spm=1018.2118.3001.4449
代碼如下:
package org.xiangbiao.p2p.client;
import org.tio.core.intf.Packet;
/**
* 客戶端包物體
* @author larry.xiang
*/
public class ClientPacket extends Packet {
public static final Integer PACKET_HEADER_LENGTH = 4;
private byte[] body;
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
}
package org.xiangbiao.p2p.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import java.nio.ByteBuffer;
/**
* MyClientAioHander
* @author larry.xiang
*/
public class MyClientAioHander implements ClientAioHandler {
Logger logger = LoggerFactory.getLogger(MyClientAioHander.class);
@Override
public Packet heartbeatPacket() {
return new ClientPacket();
}
@Override
public Packet decode(ByteBuffer byteBuffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
if(readableLength < ClientPacket.PACKET_HEADER_LENGTH){
return null;
}
int bodyLength = byteBuffer.getInt();
if(bodyLength < 0){
throw new AioDecodeException("body length is invalid.romote: " + channelContext.getServerNode());
}
int usefulLength = ClientPacket.PACKET_HEADER_LENGTH + bodyLength;
if(usefulLength > readableLength){
return null;
}else {
ClientPacket packet = new ClientPacket();
byte[] body = new byte[bodyLength];
byteBuffer.get(body);
packet.setBody(body);
return packet;
}
}
@Override
public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
ClientPacket clientPacket = (ClientPacket) packet;
byte[] body = clientPacket.getBody();
int bodyLength = 0;
if(body != null){
bodyLength = body.length;
}
int len = ClientPacket.PACKET_HEADER_LENGTH + bodyLength;
ByteBuffer byteBuffer = ByteBuffer.allocate(len);
byteBuffer.order(groupContext.getByteOrder());
byteBuffer.putInt(bodyLength);
if(body != null){
byteBuffer.put(body);
}
return byteBuffer;
}
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
ClientPacket clientPacket = (ClientPacket) packet;
byte[] body = clientPacket.getBody();
if(body != null){
String bodyStr = new String(body, "utf-8");
logger.debug("客戶端收到訊息: " + bodyStr);
}
}
}
package org.xiangbiao.p2p.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
/**
* MyClientAioListener
* @author larry.xiang
*/
public class MyClientAioListener implements ClientAioListener {
Logger logger = LoggerFactory.getLogger(MyClientAioListener.class);
private static Integer count = 0;
@Override
public void onAfterConnected(ChannelContext channelContext, boolean b, boolean b1) throws Exception {
logger.info("onAfterConnected!");
}
@Override
public void onAfterDecoded(ChannelContext channelContext, Packet packet, int i) throws Exception {
logger.info("onAfterDecoded...");
}
@Override
public void onAfterReceivedBytes(ChannelContext channelContext, int i) throws Exception {
logger.info("onAfterReceivedBytes-------------------" + i);
}
@Override
public void onAfterSent(ChannelContext channelContext, Packet packet, boolean b) throws Exception {
logger.info("onAfterSent...");
}
@Override
public void onAfterHandled(ChannelContext channelContext, Packet packet, long l) throws Exception {
System.out.println("onAfterHandled...");
ClientPacket clientPacket = (ClientPacket) packet;
String resData = new String(clientPacket.getBody(), "utf-8");
logger.info("[" + channelContext.getServerNode() + "]: " + resData);
count++;
((ClientPacket) packet).setBody(("[" + channelContext.getServerNode() + "]: " + count).getBytes());
Thread.sleep(5000);
Tio.send(channelContext, packet);
}
@Override
public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String s, boolean b) throws Exception {
logger.error(throwable.getMessage());
logger.info(s);
}
}
package org.xiangbiao.p2p.client;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupContext;
import org.tio.client.TioClient;
import org.tio.core.Node;
import org.tio.core.Tio;
/**
*客戶端測驗
* @author larry.xiang
*/
public class TestClient
{
public static void main( String[] args ) throws Exception {
ClientGroupContext clientGroupContext = new ClientGroupContext(new MyClientAioHander(), new MyClientAioListener());
TioClient tioClient = new TioClient(clientGroupContext);
ClientChannelContext clientChannelContext = tioClient.connect(new Node("127.0.0.1", 8999));
ClientPacket clientPacket = new ClientPacket();
// 模擬區塊資訊同步
String blockStr="{blockHash: 0x16872b8dc1039c4e2ccc114cc1bfecb6fffe011904f2ccc66ac154a1629c331c\\n\" +\"blockNumber: 0x25\\n\" +\n" +
" \"gas: 0x5f5e100\\n\" +\n" +
" \"from: 0x8dbfde6445d39de6b84b5f5c445be00f8fb755cf\\n\" +\n" +
" \"transactionIndex: 0x0\\n\" +\n" +
" \"to: 0xef860c28e70e59d1f775a24b38b90bde1b317ad2\\n\" +\n" +
" \"nonce: 0xbcd86aae2291d3dac96b02f4834e6b2db218f91e9022d7022c7fc7a679d67f\\n\" +\n" +
" \"value: 0x0\\n\" +\n" +
" \"hash: 0xdb8234eeabd623cf15fd3193e79b1b7bb169c986b2ea193c6736dfdb0a914790\\n\" +\n" +
" \"gasPrice: 0x1\\n\" + \"input:xxxxxxlarry }\"";
clientPacket.setBody(blockStr.getBytes("utf-8"));
Tio.send(clientChannelContext, clientPacket);
}
}
測驗結果如下:

可以拿到模擬的區塊資訊
總結
這次我們用java基于t-io框架實作區塊鏈中的p2p網路構建,實作通信的功能,初步模擬了區塊資訊同步的demo,但是在實作的底層中遠遠比我們的要復雜,但是其原理不變,轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/197284.html
標籤:python
上一篇:【筆記】前端面試必備技巧——概述
