久久精品色妇熟妇丰满人妻99,久久久网,和少妇疯狂做爰过程,欧美系列亚洲系列国产系列

廊坊新聞網(wǎng)-主流媒體,廊坊城市門戶

【環(huán)球熱聞】自己動手實現(xiàn)rpc框架(一) 實現(xiàn)點對點的rpc通信

2023-06-27 00:22:02 來源:博客園
自己動手實現(xiàn)rpc框架(一) 實現(xiàn)點對點的rpc通信1. 什么是rpc?

RPC是遠過程調(diào)用(Remote Procedure Call)的縮寫形式,其區(qū)別于一個程序內(nèi)部基本的過程調(diào)用(或者叫函數(shù)/方法調(diào)用)。

隨著應(yīng)用程序變得越來越復(fù)雜,在單個機器上中僅通過一個進程來運行整個應(yīng)用程序的方式已經(jīng)難以滿足現(xiàn)實中日益增長的需求。開發(fā)者對應(yīng)用程序進行模塊化的拆分,以分布式部署的方式來降低程序整體的復(fù)雜度和提升性能方面的可拓展性(分而治之的思想)。


(資料圖)

拆分后部署在不同機器上的各個模塊無法像之前那樣通過內(nèi)存尋址的方式來互相訪問,而是需要通過網(wǎng)絡(luò)來進行通信。RPC最主要的功能就是在提供不同模塊服務(wù)間的網(wǎng)絡(luò)通信能力的同時,又盡可能的不丟失本地調(diào)用時語義的簡潔性。rpc可以認為是分布式系統(tǒng)中類似人體經(jīng)絡(luò)一樣的基礎(chǔ)設(shè)施,因此有必要對其工作原理有一定的了解。

2. MyRpc介紹

要學(xué)習(xí)rpc的原理,理論上最好的辦法就是去看流行的開源框架源碼。但dubbo這樣成熟的rpc框架由于已經(jīng)迭代了很多年,為了滿足多樣的需求而有著復(fù)雜的架構(gòu)和龐大的代碼量。對于普通初學(xué)者來說往往很難從層層抽象封裝中把握住關(guān)于rpc框架最核心的內(nèi)容。

MyRpc是我最近在學(xué)習(xí)MIT6.824分布式系統(tǒng)公開課時,使用java并基于netty實現(xiàn)的一個簡易rpc框架,實現(xiàn)的過程中許多地方都參考了dubbo以及一些demo級別的rpc框架。MyRpc是demo級別的框架,理解起來會輕松不少。在對基礎(chǔ)的rpc實現(xiàn)原理有一定了解后,能對后續(xù)研究dubbo等開源rpc框架帶來很大的幫助。

目前MyRpc實現(xiàn)了以下功能

網(wǎng)絡(luò)通信(netty做客戶端、服務(wù)端網(wǎng)絡(luò)交互,服務(wù)端使用一個線程池處理業(yè)務(wù)邏輯)實現(xiàn)消息的序列化(實現(xiàn)序列化方式的抽象,支持json、hessian、jdk序列化等)客戶端代理生成(目前只實現(xiàn)了jdk動態(tài)代理)服務(wù)注冊 + 注冊中心集成(實現(xiàn)注冊中心的抽象,但目前只支持用zookeeper做注冊中心)集群負載均衡策略(實現(xiàn)負載均衡策略的抽象,支持roundRobin輪訓(xùn),隨機等)使用時間輪,支持設(shè)置消費者調(diào)用超時時間

限于篇幅,以上功能會拆分為兩篇博客分別介紹。其中前3個功能實現(xiàn)了基本的點對點通信的rpc功能,將在本篇博客中結(jié)合源碼詳細分析。

MyRpc架構(gòu)圖

3. MyRpc源碼分析3.1 基于netty的極簡客戶端/服務(wù)端交互demo

MyRpc是以netty為基礎(chǔ)的,下面展示一個最基礎(chǔ)的netty客戶端/服務(wù)端交互的demo。

netty服務(wù)端:

/** * 最原始的netty服務(wù)端demo * */public class PureNettyServer {    public static void main(String[] args) throws InterruptedException {        ServerBootstrap bootstrap = new ServerBootstrap();        EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));        EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true));        bootstrap.group(bossGroup, workerGroup)            .channel(NioServerSocketChannel.class)            .childHandler(new ChannelInitializer() {                @Override                protected void initChannel(SocketChannel socketChannel) {                    socketChannel.pipeline()                        // 實際調(diào)用業(yè)務(wù)方法的處理器                        .addLast("serverHandler", new SimpleChannelInboundHandler() {                            @Override                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf requestByteBuf) {                                String requestStr = requestByteBuf.toString(CharsetUtil.UTF_8);                                System.out.println("PureNettyServer read request=" + JsonUtil.json2Obj(requestStr, User.class));                                // 服務(wù)端響應(yīng)echo                                ByteBuf byteBuf = Unpooled.copiedBuffer("echo:" + requestStr,CharsetUtil.UTF_8);                                channelHandlerContext.writeAndFlush(byteBuf);                            }                        })                    ;                }            });        ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync();        System.out.println("netty server started!");        // 一直阻塞在這里        channelFuture.channel().closeFuture().sync();    }}

netty客戶端:

/** * 最原始的netty客戶端demo * */public class PureNettyClient {    public static void main(String[] args) throws InterruptedException {        Bootstrap bootstrap = new Bootstrap();        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8,            new DefaultThreadFactory("NettyClientWorker", true));        bootstrap.group(eventLoopGroup)            .channel(NioSocketChannel.class)            .handler(new ChannelInitializer() {                @Override                protected void initChannel(SocketChannel socketChannel) {                    socketChannel.pipeline()                        .addLast("clientHandler", new SimpleChannelInboundHandler() {                            @Override                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf responseByteBuf) {                                String responseStr = responseByteBuf.toString(CharsetUtil.UTF_8);                                System.out.println("PureNettyClient received response=" + responseStr);                            }                        })                    ;                }            });        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();        Channel channel = channelFuture.sync().channel();        // 發(fā)送一個user對象的json串        User user = new User("Tom",10);        ByteBuf requestByteBuf = Unpooled.copiedBuffer(JsonUtil.obj2Str(user), CharsetUtil.UTF_8);        channel.writeAndFlush(requestByteBuf);        System.out.println("netty client send request success!");        channelFuture.channel().closeFuture().sync();    }}
demo示例中,netty的服務(wù)端啟動后綁定在本機127.0.0.1的8888端口上,等待來自客戶端的連接。netty客戶端向服務(wù)端發(fā)起連接請求,在成功建立連接后向服務(wù)端發(fā)送了一個User對象字符串對應(yīng)的字節(jié)數(shù)組。服務(wù)端在接受到這一字節(jié)數(shù)組后反序列化為User對象并打印在控制臺,隨后echo響應(yīng)了一個字符串??蛻舳嗽诮邮艿巾憫?yīng)后,將echo字符串打印在了控制臺上3.2 設(shè)計MyRpc通信協(xié)議,解決黏包/拆包問題

上面展示了一個最基礎(chǔ)的netty網(wǎng)絡(luò)通信的demo,似乎一個點對點的傳輸功能已經(jīng)得到了良好的實現(xiàn)。但作為一個rpc框架,還需要解決tcp傳輸層基于字節(jié)流的消息黏包/拆包問題。

黏包/拆包問題介紹

操作系統(tǒng)實現(xiàn)的傳輸層tcp協(xié)議中,向上層的應(yīng)用保證盡最大可能的(best effort delivery)、可靠的傳輸字節(jié)流,但并不關(guān)心實際傳輸?shù)臄?shù)據(jù)包是否總是符合應(yīng)用層的要求。

黏包問題:假設(shè)應(yīng)用層發(fā)送的一次請求數(shù)據(jù)量比較小(比如0.1kb),tcp層可能不會在接到應(yīng)用請求后立即進行傳輸,而是會稍微等待一小會。這樣如果應(yīng)用層在短時間內(nèi)需要傳輸多次0.1kb的請求,就可以攢在一起批量傳輸,傳輸效率會高很多。但這帶來的問題就是接收端一次接受到的數(shù)據(jù)包內(nèi)應(yīng)用程序邏輯上的多次請求黏連在了一起,需要通過一些方法來將其拆分還原為一個個獨立的信息給應(yīng)用層。拆包問題:假設(shè)應(yīng)用層發(fā)送的一次請求數(shù)據(jù)量比較大(比如100Mb),而tcp層的數(shù)據(jù)包容量的最大值是有限的,所以應(yīng)用層較大的一次請求數(shù)據(jù)會被拆分為多個包分開發(fā)送。這就導(dǎo)致接收端接受到的某個數(shù)據(jù)包其實并不是完整的應(yīng)用層請求數(shù)據(jù),沒法直接交給應(yīng)用程序去使用,而必須等待后續(xù)對應(yīng)請求的所有數(shù)據(jù)包都接受完成后,才能組裝成完整的請求對象再交給應(yīng)用層處理??梢钥吹?,上述的黏包/拆包問題并不能看做是tcp的問題,而是應(yīng)用層最終需求與tcp傳輸層功能不匹配導(dǎo)致的問題。tcp出于傳輸效率的考慮無法解決這個問題,所以黏包拆包問題最終只能在更上面的應(yīng)用層自己來處理。

一個數(shù)據(jù)包中可能同時存在黏包問題和拆包問題(如下圖所示)黏包拆包示意圖

黏包/拆包問題解決方案

解決黏包/拆包問題最核心的思路是,如何知道一個應(yīng)用層完整請求的邊界。對于黏包問題,基于邊界可以獨立的拆分出每一個請求;對于拆包問題,如果發(fā)現(xiàn)收到的數(shù)據(jù)包末尾沒有邊界,則繼續(xù)等待新的數(shù)據(jù)包,直到發(fā)現(xiàn)邊界后再一并上交給應(yīng)用程序。

主流的解決黏包拆包的應(yīng)用層協(xié)議設(shè)計方案有三種:

介紹優(yōu)點缺點
1.基于固定長度的協(xié)議每個消息都是固定的大小,如果實際上小于固定值,則需要填充簡單;易于實現(xiàn)固定值過大,填充會浪費大量傳輸帶寬;固定值過小則限制了可用的消息體大小
2.基于特殊分隔符的協(xié)議約定一個特殊的分隔符,以這個分割符為消息邊界簡單;且消息體長度是可變的,性能好消息體的業(yè)務(wù)數(shù)據(jù)不允許包含這個特殊分隔符,否則會錯誤的拆分數(shù)據(jù)包。因此兼容性較差
3.基于業(yè)務(wù)數(shù)據(jù)長度編碼的協(xié)議設(shè)計一個固定大小的消息請求頭(比如固定16字節(jié)、20字節(jié)大小),在消息請求頭中包含實際的業(yè)務(wù)消息體長度消息體長度可變,性能好;對業(yè)務(wù)數(shù)據(jù)內(nèi)容無限制,兼容性也好實現(xiàn)起來稍顯復(fù)雜

對于流行的rpc框架,一般都是選用性能與兼容性皆有的方案3:即自己設(shè)計一個固定大小的、包含了請求體長度字段的請求頭。MyRpc參考dubbo,也設(shè)計了一個固定16字節(jié)大小的請求頭(里面有幾個字段暫時沒用上)。

請求頭: MessageHeader

/** * 共16字節(jié)的請求頭 * */public class MessageHeader implements Serializable {    public static final int MESSAGE_HEADER_LENGTH = 16;    public static final int MESSAGE_SERIALIZE_TYPE_LENGTH = 5;    public static final short MAGIC = (short)0x2233;    // ================================ 消息頭 =================================    /**     * 魔數(shù)(占2字節(jié))     * */    private short magicNumber = MAGIC;    /**     * 消息標識(0代表請求事件;1代表響應(yīng)事件, 占1位)     * @see MessageFlagEnums     * */    private Boolean messageFlag;    /**     * 是否是雙向請求(0代表oneWay請求;1代表twoWay請求)     * (雙向代表客戶端會等待服務(wù)端的響應(yīng),單向則請求發(fā)送完成后即向上層返回成功)     * */    private Boolean twoWayFlag;    /**     * 是否是心跳消息(0代表正常消息;1代表心跳消息, 占1位)     * */    private Boolean eventFlag;    /**     * 消息體序列化類型(占5位,即所支持的序列化類型不得超過2的5次方,32種)     * @see MessageSerializeType     * */    private Boolean[] serializeType;    /**     * 響應(yīng)狀態(tài)(占1字節(jié))     * */    private byte responseStatus;    /**     * 消息的唯一id(占8字節(jié))     * */    private long messageId;    /**     * 業(yè)務(wù)數(shù)據(jù)長度(占4字節(jié))     * */    private int bizDataLength;}

完整的消息對象: MessageProtocol

public class MessageProtocol implements Serializable {    /**     * 請求頭     * */    private MessageHeader messageHeader;    /**     * 請求體(實際的業(yè)務(wù)消息對象)     * */    private T bizDataBody;}

MyRpc消息示例圖

rpc請求/響應(yīng)對象
/** * rpc請求對象 * */public class RpcRequest implements Serializable {    private static final AtomicLong INVOKE_ID = new AtomicLong(0);    /**     * 消息的唯一id(占8字節(jié))     * */    private final long messageId;    /**     * 接口名     * */    private String interfaceName;    /**     * 方法名     * */    private String methodName;    /**     * 參數(shù)類型數(shù)組(每個參數(shù)一項)     * */    private Class[] parameterClasses;    /**     * 實際參數(shù)對象數(shù)組(每個參數(shù)一項)     * */    private Object[] params;    public RpcRequest() {        // 每個請求對象生成時都自動生成單機全局唯一的自增id        this.messageId = INVOKE_ID.getAndIncrement();    }}
/** * rpc響應(yīng)對象 * */public class RpcResponse implements Serializable {    /**     * 消息的唯一id(占8字節(jié))     * */    private long messageId;    /**     * 返回值     */    private Object returnValue;    /**     * 異常值     */    private Exception exceptionValue;}
處理自定義消息的netty編解碼器

在上一節(jié)的netty demo中的消息處理器中,一共做了兩件事情;一是將原始數(shù)據(jù)包的字節(jié)流轉(zhuǎn)化成了應(yīng)用程序所需的String對象;二是拿到String對象后進行響應(yīng)的業(yè)務(wù)處理(比如打印在控制臺上)。而netty框架允許配置多個消息處理器組成鏈條,按約定的順序處理出站/入站的消息;因此從模塊化的出發(fā),應(yīng)該將編碼/解碼的邏輯和實際業(yè)務(wù)的處理拆分成多個處理器。

在自定義的消息編碼器、解碼器中進行應(yīng)用層請求/響應(yīng)數(shù)據(jù)的序列化/反序列化,同時處理上述的黏包/拆包問題。

編解碼工具類

public class MessageCodecUtil {    /**     * 報文協(xié)議編碼     * */    public static  void messageEncode(MessageProtocol messageProtocol, ByteBuf byteBuf) {        MessageHeader messageHeader = messageProtocol.getMessageHeader();        // 寫入魔數(shù)        byteBuf.writeShort(MessageHeader.MAGIC);        // 寫入消息標識        byteBuf.writeBoolean(messageHeader.getMessageFlag());        // 寫入單/雙向標識        byteBuf.writeBoolean(messageHeader.getTwoWayFlag());        // 寫入消息事件標識        byteBuf.writeBoolean(messageHeader.getEventFlag());        // 寫入序列化類型        for(boolean b : messageHeader.getSerializeType()){            byteBuf.writeBoolean(b);        }        // 寫入響應(yīng)狀態(tài)        byteBuf.writeByte(messageHeader.getResponseStatus());        // 寫入消息uuid        byteBuf.writeLong(messageHeader.getMessageId());        // 序列化消息體        MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType());        byte[] bizMessageBytes = myRpcSerializer.serialize(messageProtocol.getBizDataBody());        // 獲得并寫入消息正文長度        byteBuf.writeInt(bizMessageBytes.length);        // 寫入消息正文內(nèi)容        byteBuf.writeBytes(bizMessageBytes);    }    /**     * 報文協(xié)議header頭解碼     * */    public static MessageHeader messageHeaderDecode(ByteBuf byteBuf){        MessageHeader messageHeader = new MessageHeader();        // 讀取魔數(shù)        messageHeader.setMagicNumber(byteBuf.readShort());        // 讀取消息標識        messageHeader.setMessageFlag(byteBuf.readBoolean());        // 讀取單/雙向標識        messageHeader.setTwoWayFlag(byteBuf.readBoolean());        // 讀取消息事件標識        messageHeader.setEventFlag(byteBuf.readBoolean());        // 讀取序列化類型        Boolean[] serializeTypeBytes = new Boolean[MessageHeader.MESSAGE_SERIALIZE_TYPE_LENGTH];        for(int i=0; i T messageBizDataDecode(MessageHeader messageHeader, ByteBuf byteBuf, Class messageBizDataType){        // 讀取消息正文        byte[] bizDataBytes = new byte[messageHeader.getBizDataLength()];        byteBuf.readBytes(bizDataBytes);        // 反序列化消息體        MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType());        return (T) myRpcSerializer.deserialize(bizDataBytes,messageBizDataType);    }}

自定義編碼器: NettyEncoder

public class NettyEncoder extends MessageToByteEncoder> {    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) {        // 繼承自MessageToByteEncoder中,只需要將編碼后的數(shù)據(jù)寫入?yún)?shù)中指定的byteBuf中即可        // MessageToByteEncoder源碼邏輯中會自己去將byteBuf寫入channel的        MessageCodecUtil.messageEncode(messageProtocol,byteBuf);    }}

自定義解碼器: NettyDecoder

/** * netty 解碼器 */public class NettyDecoder extends ByteToMessageDecoder {    private static final Logger logger = LoggerFactory.getLogger(NettyDecoder.class);    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list){        // 保存讀取前的讀指針        int beforeReadIndex = byteBuf.readerIndex();        do{            try {                MessageDecodeResult messageDecodeResult = decodeHeader(byteBuf);                if (messageDecodeResult.isNeedMoreData()) {                    // 出現(xiàn)拆包沒有讀取到一個完整的rpc請求,還原byteBuf讀指針,等待下一次讀事件                    byteBuf.readerIndex(beforeReadIndex);                    break;                } else {                    // 正常解析完一個完整的message,交給后面的handler處理                    list.add(messageDecodeResult.getMessageProtocol());                }            }catch (Exception e){                // 比如decodeHeader里json序列化失敗了等等.直接跳過這個數(shù)據(jù)包不還原了                logger.error("NettyDecoder error!",e);            }            // 循環(huán),直到整個ByteBuf讀取完        }while(byteBuf.isReadable());    }        private MessageDecodeResult decodeHeader(ByteBuf byteBuf){        int readable = byteBuf.readableBytes();        if(readable < MessageHeader.MESSAGE_HEADER_LENGTH){            // 無法讀取到一個完整的header,說明出現(xiàn)了拆包,等待更多的數(shù)據(jù)            return MessageDecodeResult.needMoreData();        }        // 讀取header頭        MessageHeader messageHeader = MessageCodecUtil.messageHeaderDecode(byteBuf);        int bizDataLength = messageHeader.getBizDataLength();        if(byteBuf.readableBytes() < bizDataLength){            // 無法讀取到一個完整的正文內(nèi)容,說明出現(xiàn)了拆包,等待更多的數(shù)據(jù)            return MessageDecodeResult.needMoreData();        }        // 基于消息類型標識,解析rpc正文對象        boolean messageFlag = messageHeader.getMessageFlag();        if(messageFlag == MessageFlagEnums.REQUEST.getCode()){            RpcRequest rpcRequest = MessageCodecUtil.messageBizDataDecode(messageHeader,byteBuf,RpcRequest.class);            MessageProtocol messageProtocol = new MessageProtocol<>(messageHeader,rpcRequest);            // 正確的解析完一個rpc請求消息            return MessageDecodeResult.decodeSuccess(messageProtocol);        }else{            RpcResponse rpcResponse = MessageCodecUtil.messageBizDataDecode(messageHeader,byteBuf,RpcResponse.class);            MessageProtocol messageProtocol = new MessageProtocol<>(messageHeader,rpcResponse);            // 正確的解析完一個rpc響應(yīng)消息            return MessageDecodeResult.decodeSuccess(messageProtocol);        }    }}解決了黏包/拆包問題后的demo示例

demo的服務(wù)示例:

public class User implements Serializable {    private String name;    private Integer age;}
public interface UserService {    User getUserFriend(User user, String message);}
public class UserServiceImpl implements UserService {    @Override    public User getUserFriend(User user, String message) {        System.out.println("execute getUserFriend, user=" + user + ",message=" + message);        // demo返回一個不同的user對象回去        return new User(user.getName() + ".friend", user.getAge() + 1);    }}

netty服務(wù)端:

public class RpcServer {    private static final Map interfaceImplMap = new HashMap<>();    static{        /**         * 簡單一點配置死實現(xiàn)         * */        interfaceImplMap.put(UserService.class.getName(), new UserServiceImpl());    }    public static void main(String[] args) throws InterruptedException {        ServerBootstrap bootstrap = new ServerBootstrap();        EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));        EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true));        bootstrap.group(bossGroup, workerGroup)            .channel(NioServerSocketChannel.class)            .childHandler(new ChannelInitializer() {                @Override                protected void initChannel(SocketChannel socketChannel) {                    socketChannel.pipeline()                        // 編碼、解碼處理器                        .addLast("encoder", new NettyEncoder<>())                        .addLast("decoder", new NettyDecoder())                        // 實際調(diào)用業(yè)務(wù)方法的處理器                        .addLast("serverHandler", new SimpleChannelInboundHandler>() {                            @Override                            protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) {                                // 找到本地的方法進行調(diào)用,并獲得返回值(demo,簡單起見直接同步調(diào)用)                                MessageProtocol result = handlerRpcRequest(msg);                                // 將返回值響應(yīng)給客戶端                                ctx.writeAndFlush(result);                            }                        });                }            });        ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync();        System.out.println("netty server started!");        // 一直阻塞在這里        channelFuture.channel().closeFuture().sync();    }    private static MessageProtocol handlerRpcRequest(MessageProtocol rpcRequestMessageProtocol){        long requestMessageId = rpcRequestMessageProtocol.getMessageHeader().getMessageId();        MessageHeader messageHeader = new MessageHeader();        messageHeader.setMessageId(requestMessageId);        messageHeader.setMessageFlag(MessageFlagEnums.RESPONSE.getCode());        messageHeader.setTwoWayFlag(false);        messageHeader.setEventFlag(false);        messageHeader.setSerializeType(rpcRequestMessageProtocol.getMessageHeader().getSerializeType());        RpcResponse rpcResponse = new RpcResponse();        rpcResponse.setMessageId(requestMessageId);        try {            // 反射調(diào)用具體的實現(xiàn)方法            Object result = invokeTargetService(rpcRequestMessageProtocol.getBizDataBody());            // 設(shè)置返回值            rpcResponse.setReturnValue(result);        }catch (Exception e){            // 調(diào)用具體實現(xiàn)類時,出現(xiàn)異常,設(shè)置異常的值            rpcResponse.setExceptionValue(e);        }        return new MessageProtocol<>(messageHeader,rpcResponse);    }    private static Object invokeTargetService(RpcRequest rpcRequest) throws Exception {        String interfaceName = rpcRequest.getInterfaceName();        Object serviceImpl = interfaceImplMap.get(interfaceName);        // 按照請求里的方法名和參數(shù)列表找到對應(yīng)的方法        final Method method = serviceImpl.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterClasses());        // 傳遞參數(shù),反射調(diào)用該方法并返回結(jié)果        return method.invoke(serviceImpl, rpcRequest.getParams());    }}

netty客戶端:

public class RpcClientNoProxy {    public static void main(String[] args) throws InterruptedException {        Bootstrap bootstrap = new Bootstrap();        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8,            new DefaultThreadFactory("NettyClientWorker", true));        bootstrap.group(eventLoopGroup)            .channel(NioSocketChannel.class)            .handler(new ChannelInitializer() {                @Override                protected void initChannel(SocketChannel socketChannel) {                    socketChannel.pipeline()                        // 編碼、解碼處理器                        .addLast("encoder", new NettyEncoder<>())                        .addLast("decoder", new NettyDecoder())                        .addLast("clientHandler", new SimpleChannelInboundHandler() {                            @Override                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) {                                System.out.println("PureNettyClient received messageProtocol=" + messageProtocol);                            }                        })                    ;                }            });        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();        Channel channel = channelFuture.sync().channel();        // 構(gòu)造消息對象        MessageProtocol messageProtocol = buildMessage();        // 發(fā)送消息        channel.writeAndFlush(messageProtocol);        System.out.println("RpcClientNoProxy send request success!");        channelFuture.channel().closeFuture().sync();    }    private static MessageProtocol buildMessage(){        // 構(gòu)造請求        RpcRequest rpcRequest = new RpcRequest();        rpcRequest.setInterfaceName("myrpc.demo.common.service.UserService");        rpcRequest.setMethodName("getUserFriend");        rpcRequest.setParameterClasses(new Class[]{User.class,String.class});        User user = new User("Jerry",10);        String message = "hello hello!";        rpcRequest.setParams(new Object[]{user,message});        // 構(gòu)造協(xié)議頭        MessageHeader messageHeader = new MessageHeader();        messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());        messageHeader.setTwoWayFlag(false);        messageHeader.setEventFlag(true);        messageHeader.setSerializeType(MessageSerializeType.JSON.getCode());        messageHeader.setMessageId(rpcRequest.getMessageId());        return new MessageProtocol<>(messageHeader,rpcRequest);    }}

netty處理流程圖

3.3 基于動態(tài)代理實現(xiàn)一個完整的點對點rpc功能

截止目前,我們已經(jīng)實現(xiàn)了一個點對點rpc客戶端/服務(wù)端交互的功能,但是客戶端這邊的邏輯依然比較復(fù)雜(buildMessage方法)。前面提到,rpc中很重要的功能就是保持本地調(diào)用時語義的簡潔性,即客戶端實際使用時是希望直接用以下這種方式來進行調(diào)用,而不是去繁瑣的處理底層的網(wǎng)絡(luò)交互邏輯。

User user = new User("Jerry",10);    String message = "hello hello!";    // 發(fā)起rpc調(diào)用并獲得返回值    User userFriend = userService.getUserFriend(user,message);    System.out.println("userService.getUserFriend result=" + userFriend);

rpc框架需要屏蔽掉構(gòu)造底層消息發(fā)送/接受,序列化/反序列化相關(guān)的復(fù)雜性,而這時候就需要引入代理模式(動態(tài)代理)了。在MyRpc的底層,我們將客戶端需要調(diào)用的一個服務(wù)(比如UserService)抽象為Consumer對象,服務(wù)端的一個具體服務(wù)實現(xiàn)抽象為Provider對象。其中包含了對應(yīng)的服務(wù)的類以及對應(yīng)的服務(wù)地址,客戶端這邊使用jdk的動態(tài)代理生成代理對象,將復(fù)雜的、需要屏蔽的消息處理/網(wǎng)絡(luò)交互等邏輯都封裝在這個代理對象中。

public class Consumer {    private Class interfaceClass;    private T proxy;    private Bootstrap bootstrap;    private URLAddress urlAddress;    public Consumer(Class interfaceClass, Bootstrap bootstrap, URLAddress urlAddress) {        this.interfaceClass = interfaceClass;        this.bootstrap = bootstrap;        this.urlAddress = urlAddress;        ClientDynamicProxy clientDynamicProxy = new ClientDynamicProxy(bootstrap,urlAddress);        this.proxy = (T) Proxy.newProxyInstance(            clientDynamicProxy.getClass().getClassLoader(),            new Class[]{interfaceClass},            clientDynamicProxy);    }    public T getProxy() {        return proxy;    }    public Class getInterfaceClass() {        return interfaceClass;    }}
public class ConsumerBootstrap {    private final Map,Consumer> consumerMap = new HashMap<>();    private final Bootstrap bootstrap;    private final URLAddress urlAddress;    public ConsumerBootstrap(Bootstrap bootstrap, URLAddress urlAddress) {        this.bootstrap = bootstrap;        this.urlAddress = urlAddress;    }    public  Consumer registerConsumer(Class clazz){        if(!consumerMap.containsKey(clazz)){            Consumer consumer = new Consumer<>(clazz,this.bootstrap,this.urlAddress);            consumerMap.put(clazz,consumer);            return consumer;        }        throw new MyRpcException("duplicate consumer! clazz=" + clazz);    }}
public class Provider {    private Class interfaceClass;    private T ref;    private URLAddress urlAddress;}
客戶端代理對象生成
/** * 客戶端動態(tài)代理 * */public class ClientDynamicProxy implements InvocationHandler {    private static final Logger logger = LoggerFactory.getLogger(ClientDynamicProxy.class);    private final Bootstrap bootstrap;    private final URLAddress urlAddress;    public ClientDynamicProxy(Bootstrap bootstrap, URLAddress urlAddress) {        this.bootstrap = bootstrap;        this.urlAddress = urlAddress;    }    @Override    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        Object localMethodResult = processLocalMethod(proxy,method,args);        if(localMethodResult != null){            // 處理toString等對象自帶方法,不發(fā)起rpc調(diào)用            return localMethodResult;        }        logger.debug("ClientDynamicProxy before: methodName=" + method.getName());        // 構(gòu)造請求和協(xié)議頭        RpcRequest rpcRequest = new RpcRequest();        rpcRequest.setInterfaceName(method.getDeclaringClass().getName());        rpcRequest.setMethodName(method.getName());        rpcRequest.setParameterClasses(method.getParameterTypes());        rpcRequest.setParams(args);        MessageHeader messageHeader = new MessageHeader();        messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode());        messageHeader.setTwoWayFlag(false);        messageHeader.setEventFlag(true);        messageHeader.setSerializeType(GlobalConfig.messageSerializeType.getCode());        messageHeader.setResponseStatus((byte)"a");        messageHeader.setMessageId(rpcRequest.getMessageId());        logger.debug("ClientDynamicProxy rpcRequest={}", JsonUtil.obj2Str(rpcRequest));        ChannelFuture channelFuture = bootstrap.connect(urlAddress.getHost(),urlAddress.getPort()).sync();        Channel channel = channelFuture.sync().channel();        // 通過Promise,將netty的異步轉(zhuǎn)為同步,參考dubbo DefaultFuture        DefaultFuture defaultFuture = DefaultFutureManager.createNewFuture(channel,rpcRequest);        channel.writeAndFlush(new MessageProtocol<>(messageHeader,rpcRequest));        logger.debug("ClientDynamicProxy writeAndFlush success, wait result");        // 調(diào)用方阻塞在這里        RpcResponse rpcResponse = defaultFuture.get();        logger.debug("ClientDynamicProxy defaultFuture.get() rpcResponse={}",rpcResponse);        return processRpcResponse(rpcResponse);    }    private Object processLocalMethod(Object proxy, Method method, Object[] args) throws Exception {        // 處理toString等對象自帶方法,不發(fā)起rpc調(diào)用        if (method.getDeclaringClass() == Object.class) {            return method.invoke(proxy, args);        }        String methodName = method.getName();        Class[] parameterTypes = method.getParameterTypes();        if (parameterTypes.length == 0) {            if ("toString".equals(methodName)) {                return proxy.toString();            } else if ("hashCode".equals(methodName)) {                return proxy.hashCode();            }        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {            return proxy.equals(args[0]);        }        // 返回null標識非本地方法,需要進行rpc調(diào)用        return null;    }    private Object processRpcResponse(RpcResponse rpcResponse){        if(rpcResponse.getExceptionValue() == null){            // 沒有異常,return正常的返回值            return rpcResponse.getReturnValue();        }else{            // 有異常,往外拋出去            throw new MyRpcRemotingException(rpcResponse.getExceptionValue());        }    }}
客戶端接收響應(yīng)處理(通過DefaultFuture實現(xiàn)異步轉(zhuǎn)同步)
/** * 客戶端 rpc響應(yīng)處理器 */public class NettyRpcResponseHandler extends SimpleChannelInboundHandler> {    private static final Logger logger = LoggerFactory.getLogger(NettyRpcResponseHandler.class);    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol rpcResponseMessageProtocol) throws Exception {        logger.debug("NettyRpcResponseHandler channelRead0={}",JsonUtil.obj2Str(rpcResponseMessageProtocol));        // 觸發(fā)客戶端的future,令其同步阻塞的線程得到結(jié)果        DefaultFutureManager.received(rpcResponseMessageProtocol.getBizDataBody());    }}
public class DefaultFutureManager {    private static Logger logger = LoggerFactory.getLogger(DefaultFutureManager.class);    public static final Map DEFAULT_FUTURE_CACHE = new ConcurrentHashMap<>();    public static void received(RpcResponse rpcResponse){        Long messageId = rpcResponse.getMessageId();        logger.debug("received rpcResponse={},DEFAULT_FUTURE_CACHE={}",rpcResponse,DEFAULT_FUTURE_CACHE);        DefaultFuture defaultFuture = DEFAULT_FUTURE_CACHE.remove(messageId);        if(defaultFuture != null){            logger.debug("remove defaultFuture success");            if(rpcResponse.getExceptionValue() != null){                // 異常處理                defaultFuture.completeExceptionally(rpcResponse.getExceptionValue());            }else{                // 正常返回                defaultFuture.complete(rpcResponse);            }        }else{            logger.debug("remove defaultFuture fail");        }    }    public static DefaultFuture createNewFuture(Channel channel, RpcRequest rpcRequest){        DefaultFuture defaultFuture = new DefaultFuture(channel,rpcRequest);        return defaultFuture;    }}
代理模式下點對點rpc的客戶端demo
public class RpcClientProxy {    public static void main(String[] args) throws InterruptedException {        Bootstrap bootstrap = new Bootstrap();        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8, new DefaultThreadFactory("NettyClientWorker", true));        bootstrap.group(eventLoopGroup)            .channel(NioSocketChannel.class)            .handler(new ChannelInitializer() {                @Override                protected void initChannel(SocketChannel socketChannel) {                    socketChannel.pipeline()                        // 編碼、解碼處理器                        .addLast("encoder", new NettyEncoder<>())                        .addLast("decoder", new NettyDecoder())                        // 響應(yīng)處理器                        .addLast("clientHandler", new NettyRpcResponseHandler())                    ;                }            });        ConsumerBootstrap consumerBootstrap = new ConsumerBootstrap(bootstrap, new URLAddress("127.0.0.1", 8888));        Consumer userServiceConsumer = consumerBootstrap.registerConsumer(UserService.class);        // 獲得UserService的代理對象        UserService userService = userServiceConsumer.getProxy();        User user = new User("Jerry", 10);        String message = "hello hello!";        // 發(fā)起rpc調(diào)用并獲得返回值        User userFriend = userService.getUserFriend(user, message);        System.out.println("userService.getUserFriend result=" + userFriend);    }}

可以看到,引入了代理模式后的使用方式就變得簡單很多了。到這一步,我們已經(jīng)實現(xiàn)了一個點對點的rpc通信的能力,并且如博客開頭中所提到的,沒有喪失本地調(diào)用語義的簡潔性。

總結(jié)這篇博客是我關(guān)于Mit6.824分布式系統(tǒng)公開課lab的第一篇博客,按照計劃會將實現(xiàn)簡易版rpc和raft k/v數(shù)據(jù)庫的心得以博客的形式分享出來,希望能幫助到對分布式系統(tǒng)相關(guān)技術(shù)的小伙伴。打個廣告:對于英語不好(沒法直接啃生肉)但又對國外著名的計算機公開課(涉及操作系統(tǒng)、數(shù)據(jù)庫、分布式系統(tǒng)、編譯原理、計算機網(wǎng)絡(luò)、算法等等)感興趣的小伙伴,可以咨詢simviso購買中英翻譯質(zhì)量很高的公開課視頻(比如Mit6.824,b站上開放了一部分免費的視頻:https://www.bilibili.com/video/BV1x7411M7Sf)。博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyRpc (release/lab1分支),內(nèi)容如有錯誤,還請多多指教。

關(guān)鍵詞: