RPC是遠過程調(diào)用(Remote Procedure Call)的縮寫形式,其區(qū)別于一個程序內(nèi)部基本的過程調(diào)用(或者叫函數(shù)/方法調(diào)用)。
隨著應(yīng)用程序變得越來越復(fù)雜,在單個機器上中僅通過一個進程來運行整個應(yīng)用程序的方式已經(jīng)難以滿足現(xiàn)實中日益增長的需求。開發(fā)者對應(yīng)用程序進行模塊化的拆分,以分布式部署的方式來降低程序整體的復(fù)雜度和提升性能方面的可拓展性(分而治之的思想)。
(資料圖)
拆分后部署在不同機器上的各個模塊無法像之前那樣通過內(nèi)存尋址的方式來互相訪問,而是需要通過網(wǎng)絡(luò)來進行通信。RPC最主要的功能就是在提供不同模塊服務(wù)間的網(wǎng)絡(luò)通信能力的同時,又盡可能的不丟失本地調(diào)用時語義的簡潔性。rpc可以認為是分布式系統(tǒng)中類似人體經(jīng)絡(luò)一樣的基礎(chǔ)設(shè)施,因此有必要對其工作原理有一定的了解。
2. MyRpc介紹要學(xué)習(xí)rpc的原理,理論上最好的辦法就是去看流行的開源框架源碼。但dubbo這樣成熟的rpc框架由于已經(jīng)迭代了很多年,為了滿足多樣的需求而有著復(fù)雜的架構(gòu)和龐大的代碼量。對于普通初學(xué)者來說往往很難從層層抽象封裝中把握住關(guān)于rpc框架最核心的內(nèi)容。
MyRpc是我最近在學(xué)習(xí)MIT6.824分布式系統(tǒng)公開課時,使用java并基于netty實現(xiàn)的一個簡易rpc框架,實現(xiàn)的過程中許多地方都參考了dubbo以及一些demo級別的rpc框架。MyRpc是demo級別的框架,理解起來會輕松不少。在對基礎(chǔ)的rpc實現(xiàn)原理有一定了解后,能對后續(xù)研究dubbo等開源rpc框架帶來很大的幫助。
目前MyRpc實現(xiàn)了以下功能
網(wǎng)絡(luò)通信(netty做客戶端、服務(wù)端網(wǎng)絡(luò)交互,服務(wù)端使用一個線程池處理業(yè)務(wù)邏輯)實現(xiàn)消息的序列化(實現(xiàn)序列化方式的抽象,支持json、hessian、jdk序列化等)客戶端代理生成(目前只實現(xiàn)了jdk動態(tài)代理)服務(wù)注冊 + 注冊中心集成(實現(xiàn)注冊中心的抽象,但目前只支持用zookeeper做注冊中心)集群負載均衡策略(實現(xiàn)負載均衡策略的抽象,支持roundRobin輪訓(xùn),隨機等)使用時間輪,支持設(shè)置消費者調(diào)用超時時間限于篇幅,以上功能會拆分為兩篇博客分別介紹。其中前3個功能實現(xiàn)了基本的點對點通信的rpc功能,將在本篇博客中結(jié)合源碼詳細分析。
MyRpc架構(gòu)圖
3. MyRpc源碼分析3.1 基于netty的極簡客戶端/服務(wù)端交互demoMyRpc是以netty為基礎(chǔ)的,下面展示一個最基礎(chǔ)的netty客戶端/服務(wù)端交互的demo。
netty服務(wù)端:
/** * 最原始的netty服務(wù)端demo * */public class PureNettyServer { public static void main(String[] args) throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true)); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() // 實際調(diào)用業(yè)務(wù)方法的處理器 .addLast("serverHandler", new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf requestByteBuf) { String requestStr = requestByteBuf.toString(CharsetUtil.UTF_8); System.out.println("PureNettyServer read request=" + JsonUtil.json2Obj(requestStr, User.class)); // 服務(wù)端響應(yīng)echo ByteBuf byteBuf = Unpooled.copiedBuffer("echo:" + requestStr,CharsetUtil.UTF_8); channelHandlerContext.writeAndFlush(byteBuf); } }) ; } }); ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync(); System.out.println("netty server started!"); // 一直阻塞在這里 channelFuture.channel().closeFuture().sync(); }}
netty客戶端:
/** * 最原始的netty客戶端demo * */public class PureNettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8, new DefaultThreadFactory("NettyClientWorker", true)); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() .addLast("clientHandler", new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf responseByteBuf) { String responseStr = responseByteBuf.toString(CharsetUtil.UTF_8); System.out.println("PureNettyClient received response=" + responseStr); } }) ; } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync(); Channel channel = channelFuture.sync().channel(); // 發(fā)送一個user對象的json串 User user = new User("Tom",10); ByteBuf requestByteBuf = Unpooled.copiedBuffer(JsonUtil.obj2Str(user), CharsetUtil.UTF_8); channel.writeAndFlush(requestByteBuf); System.out.println("netty client send request success!"); channelFuture.channel().closeFuture().sync(); }}
demo示例中,netty的服務(wù)端啟動后綁定在本機127.0.0.1的8888端口上,等待來自客戶端的連接。netty客戶端向服務(wù)端發(fā)起連接請求,在成功建立連接后向服務(wù)端發(fā)送了一個User對象字符串對應(yīng)的字節(jié)數(shù)組。服務(wù)端在接受到這一字節(jié)數(shù)組后反序列化為User對象并打印在控制臺,隨后echo響應(yīng)了一個字符串??蛻舳嗽诮邮艿巾憫?yīng)后,將echo字符串打印在了控制臺上3.2 設(shè)計MyRpc通信協(xié)議,解決黏包/拆包問題上面展示了一個最基礎(chǔ)的netty網(wǎng)絡(luò)通信的demo,似乎一個點對點的傳輸功能已經(jīng)得到了良好的實現(xiàn)。但作為一個rpc框架,還需要解決tcp傳輸層基于字節(jié)流的消息黏包/拆包問題。
黏包/拆包問題介紹操作系統(tǒng)實現(xiàn)的傳輸層tcp協(xié)議中,向上層的應(yīng)用保證盡最大可能的(best effort delivery)、可靠的傳輸字節(jié)流,但并不關(guān)心實際傳輸?shù)臄?shù)據(jù)包是否總是符合應(yīng)用層的要求。
黏包問題:假設(shè)應(yīng)用層發(fā)送的一次請求數(shù)據(jù)量比較小(比如0.1kb),tcp層可能不會在接到應(yīng)用請求后立即進行傳輸,而是會稍微等待一小會。這樣如果應(yīng)用層在短時間內(nèi)需要傳輸多次0.1kb的請求,就可以攢在一起批量傳輸,傳輸效率會高很多。但這帶來的問題就是接收端一次接受到的數(shù)據(jù)包內(nèi)應(yīng)用程序邏輯上的多次請求黏連在了一起,需要通過一些方法來將其拆分還原為一個個獨立的信息給應(yīng)用層。拆包問題:假設(shè)應(yīng)用層發(fā)送的一次請求數(shù)據(jù)量比較大(比如100Mb),而tcp層的數(shù)據(jù)包容量的最大值是有限的,所以應(yīng)用層較大的一次請求數(shù)據(jù)會被拆分為多個包分開發(fā)送。這就導(dǎo)致接收端接受到的某個數(shù)據(jù)包其實并不是完整的應(yīng)用層請求數(shù)據(jù),沒法直接交給應(yīng)用程序去使用,而必須等待后續(xù)對應(yīng)請求的所有數(shù)據(jù)包都接受完成后,才能組裝成完整的請求對象再交給應(yīng)用層處理??梢钥吹?,上述的黏包/拆包問題并不能看做是tcp的問題,而是應(yīng)用層最終需求與tcp傳輸層功能不匹配導(dǎo)致的問題。tcp出于傳輸效率的考慮無法解決這個問題,所以黏包拆包問題最終只能在更上面的應(yīng)用層自己來處理。一個數(shù)據(jù)包中可能同時存在黏包問題和拆包問題(如下圖所示)黏包拆包示意圖
黏包/拆包問題解決方案解決黏包/拆包問題最核心的思路是,如何知道一個應(yīng)用層完整請求的邊界。對于黏包問題,基于邊界可以獨立的拆分出每一個請求;對于拆包問題,如果發(fā)現(xiàn)收到的數(shù)據(jù)包末尾沒有邊界,則繼續(xù)等待新的數(shù)據(jù)包,直到發(fā)現(xiàn)邊界后再一并上交給應(yīng)用程序。
主流的解決黏包拆包的應(yīng)用層協(xié)議設(shè)計方案有三種:
介紹 | 優(yōu)點 | 缺點 | |
---|---|---|---|
1.基于固定長度的協(xié)議 | 每個消息都是固定的大小,如果實際上小于固定值,則需要填充 | 簡單;易于實現(xiàn) | 固定值過大,填充會浪費大量傳輸帶寬;固定值過小則限制了可用的消息體大小 |
2.基于特殊分隔符的協(xié)議 | 約定一個特殊的分隔符,以這個分割符為消息邊界 | 簡單;且消息體長度是可變的,性能好 | 消息體的業(yè)務(wù)數(shù)據(jù)不允許包含這個特殊分隔符,否則會錯誤的拆分數(shù)據(jù)包。因此兼容性較差 |
3.基于業(yè)務(wù)數(shù)據(jù)長度編碼的協(xié)議 | 設(shè)計一個固定大小的消息請求頭(比如固定16字節(jié)、20字節(jié)大小),在消息請求頭中包含實際的業(yè)務(wù)消息體長度 | 消息體長度可變,性能好;對業(yè)務(wù)數(shù)據(jù)內(nèi)容無限制,兼容性也好 | 實現(xiàn)起來稍顯復(fù)雜 |
對于流行的rpc框架,一般都是選用性能與兼容性皆有的方案3:即自己設(shè)計一個固定大小的、包含了請求體長度字段的請求頭。MyRpc參考dubbo,也設(shè)計了一個固定16字節(jié)大小的請求頭(里面有幾個字段暫時沒用上)。
請求頭: MessageHeader
/** * 共16字節(jié)的請求頭 * */public class MessageHeader implements Serializable { public static final int MESSAGE_HEADER_LENGTH = 16; public static final int MESSAGE_SERIALIZE_TYPE_LENGTH = 5; public static final short MAGIC = (short)0x2233; // ================================ 消息頭 ================================= /** * 魔數(shù)(占2字節(jié)) * */ private short magicNumber = MAGIC; /** * 消息標識(0代表請求事件;1代表響應(yīng)事件, 占1位) * @see MessageFlagEnums * */ private Boolean messageFlag; /** * 是否是雙向請求(0代表oneWay請求;1代表twoWay請求) * (雙向代表客戶端會等待服務(wù)端的響應(yīng),單向則請求發(fā)送完成后即向上層返回成功) * */ private Boolean twoWayFlag; /** * 是否是心跳消息(0代表正常消息;1代表心跳消息, 占1位) * */ private Boolean eventFlag; /** * 消息體序列化類型(占5位,即所支持的序列化類型不得超過2的5次方,32種) * @see MessageSerializeType * */ private Boolean[] serializeType; /** * 響應(yīng)狀態(tài)(占1字節(jié)) * */ private byte responseStatus; /** * 消息的唯一id(占8字節(jié)) * */ private long messageId; /** * 業(yè)務(wù)數(shù)據(jù)長度(占4字節(jié)) * */ private int bizDataLength;}
完整的消息對象: MessageProtocol
public class MessageProtocol implements Serializable { /** * 請求頭 * */ private MessageHeader messageHeader; /** * 請求體(實際的業(yè)務(wù)消息對象) * */ private T bizDataBody;}
MyRpc消息示例圖
rpc請求/響應(yīng)對象/** * rpc請求對象 * */public class RpcRequest implements Serializable { private static final AtomicLong INVOKE_ID = new AtomicLong(0); /** * 消息的唯一id(占8字節(jié)) * */ private final long messageId; /** * 接口名 * */ private String interfaceName; /** * 方法名 * */ private String methodName; /** * 參數(shù)類型數(shù)組(每個參數(shù)一項) * */ private Class>[] parameterClasses; /** * 實際參數(shù)對象數(shù)組(每個參數(shù)一項) * */ private Object[] params; public RpcRequest() { // 每個請求對象生成時都自動生成單機全局唯一的自增id this.messageId = INVOKE_ID.getAndIncrement(); }}
/** * rpc響應(yīng)對象 * */public class RpcResponse implements Serializable { /** * 消息的唯一id(占8字節(jié)) * */ private long messageId; /** * 返回值 */ private Object returnValue; /** * 異常值 */ private Exception exceptionValue;}
處理自定義消息的netty編解碼器在上一節(jié)的netty demo中的消息處理器中,一共做了兩件事情;一是將原始數(shù)據(jù)包的字節(jié)流轉(zhuǎn)化成了應(yīng)用程序所需的String對象;二是拿到String對象后進行響應(yīng)的業(yè)務(wù)處理(比如打印在控制臺上)。而netty框架允許配置多個消息處理器組成鏈條,按約定的順序處理出站/入站的消息;因此從模塊化的出發(fā),應(yīng)該將編碼/解碼的邏輯和實際業(yè)務(wù)的處理拆分成多個處理器。
在自定義的消息編碼器、解碼器中進行應(yīng)用層請求/響應(yīng)數(shù)據(jù)的序列化/反序列化,同時處理上述的黏包/拆包問題。
編解碼工具類
public class MessageCodecUtil { /** * 報文協(xié)議編碼 * */ public static void messageEncode(MessageProtocol messageProtocol, ByteBuf byteBuf) { MessageHeader messageHeader = messageProtocol.getMessageHeader(); // 寫入魔數(shù) byteBuf.writeShort(MessageHeader.MAGIC); // 寫入消息標識 byteBuf.writeBoolean(messageHeader.getMessageFlag()); // 寫入單/雙向標識 byteBuf.writeBoolean(messageHeader.getTwoWayFlag()); // 寫入消息事件標識 byteBuf.writeBoolean(messageHeader.getEventFlag()); // 寫入序列化類型 for(boolean b : messageHeader.getSerializeType()){ byteBuf.writeBoolean(b); } // 寫入響應(yīng)狀態(tài) byteBuf.writeByte(messageHeader.getResponseStatus()); // 寫入消息uuid byteBuf.writeLong(messageHeader.getMessageId()); // 序列化消息體 MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType()); byte[] bizMessageBytes = myRpcSerializer.serialize(messageProtocol.getBizDataBody()); // 獲得并寫入消息正文長度 byteBuf.writeInt(bizMessageBytes.length); // 寫入消息正文內(nèi)容 byteBuf.writeBytes(bizMessageBytes); } /** * 報文協(xié)議header頭解碼 * */ public static MessageHeader messageHeaderDecode(ByteBuf byteBuf){ MessageHeader messageHeader = new MessageHeader(); // 讀取魔數(shù) messageHeader.setMagicNumber(byteBuf.readShort()); // 讀取消息標識 messageHeader.setMessageFlag(byteBuf.readBoolean()); // 讀取單/雙向標識 messageHeader.setTwoWayFlag(byteBuf.readBoolean()); // 讀取消息事件標識 messageHeader.setEventFlag(byteBuf.readBoolean()); // 讀取序列化類型 Boolean[] serializeTypeBytes = new Boolean[MessageHeader.MESSAGE_SERIALIZE_TYPE_LENGTH]; for(int i=0; i T messageBizDataDecode(MessageHeader messageHeader, ByteBuf byteBuf, Class messageBizDataType){ // 讀取消息正文 byte[] bizDataBytes = new byte[messageHeader.getBizDataLength()]; byteBuf.readBytes(bizDataBytes); // 反序列化消息體 MyRpcSerializer myRpcSerializer = MyRpcSerializerManager.getSerializer(messageHeader.getSerializeType()); return (T) myRpcSerializer.deserialize(bizDataBytes,messageBizDataType); }}
自定義編碼器: NettyEncoder
public class NettyEncoder extends MessageToByteEncoder> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol, ByteBuf byteBuf) { // 繼承自MessageToByteEncoder中,只需要將編碼后的數(shù)據(jù)寫入?yún)?shù)中指定的byteBuf中即可 // MessageToByteEncoder源碼邏輯中會自己去將byteBuf寫入channel的 MessageCodecUtil.messageEncode(messageProtocol,byteBuf); }}
自定義解碼器: NettyDecoder
/** * netty 解碼器 */public class NettyDecoder extends ByteToMessageDecoder { private static final Logger logger = LoggerFactory.getLogger(NettyDecoder.class); @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
解決了黏包/拆包問題后的demo示例demo的服務(wù)示例:
public class User implements Serializable { private String name; private Integer age;}
public interface UserService { User getUserFriend(User user, String message);}
public class UserServiceImpl implements UserService { @Override public User getUserFriend(User user, String message) { System.out.println("execute getUserFriend, user=" + user + ",message=" + message); // demo返回一個不同的user對象回去 return new User(user.getName() + ".friend", user.getAge() + 1); }}
netty服務(wù)端:
public class RpcServer { private static final Map interfaceImplMap = new HashMap<>(); static{ /** * 簡單一點配置死實現(xiàn) * */ interfaceImplMap.put(UserService.class.getName(), new UserServiceImpl()); } public static void main(String[] args) throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); EventLoopGroup workerGroup = new NioEventLoopGroup(8,new DefaultThreadFactory("NettyServerWorker", true)); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() // 編碼、解碼處理器 .addLast("encoder", new NettyEncoder<>()) .addLast("decoder", new NettyDecoder()) // 實際調(diào)用業(yè)務(wù)方法的處理器 .addLast("serverHandler", new SimpleChannelInboundHandler>() { @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) { // 找到本地的方法進行調(diào)用,并獲得返回值(demo,簡單起見直接同步調(diào)用) MessageProtocol result = handlerRpcRequest(msg); // 將返回值響應(yīng)給客戶端 ctx.writeAndFlush(result); } }); } }); ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", 8888).sync(); System.out.println("netty server started!"); // 一直阻塞在這里 channelFuture.channel().closeFuture().sync(); } private static MessageProtocol handlerRpcRequest(MessageProtocol rpcRequestMessageProtocol){ long requestMessageId = rpcRequestMessageProtocol.getMessageHeader().getMessageId(); MessageHeader messageHeader = new MessageHeader(); messageHeader.setMessageId(requestMessageId); messageHeader.setMessageFlag(MessageFlagEnums.RESPONSE.getCode()); messageHeader.setTwoWayFlag(false); messageHeader.setEventFlag(false); messageHeader.setSerializeType(rpcRequestMessageProtocol.getMessageHeader().getSerializeType()); RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setMessageId(requestMessageId); try { // 反射調(diào)用具體的實現(xiàn)方法 Object result = invokeTargetService(rpcRequestMessageProtocol.getBizDataBody()); // 設(shè)置返回值 rpcResponse.setReturnValue(result); }catch (Exception e){ // 調(diào)用具體實現(xiàn)類時,出現(xiàn)異常,設(shè)置異常的值 rpcResponse.setExceptionValue(e); } return new MessageProtocol<>(messageHeader,rpcResponse); } private static Object invokeTargetService(RpcRequest rpcRequest) throws Exception { String interfaceName = rpcRequest.getInterfaceName(); Object serviceImpl = interfaceImplMap.get(interfaceName); // 按照請求里的方法名和參數(shù)列表找到對應(yīng)的方法 final Method method = serviceImpl.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterClasses()); // 傳遞參數(shù),反射調(diào)用該方法并返回結(jié)果 return method.invoke(serviceImpl, rpcRequest.getParams()); }}
netty客戶端:
public class RpcClientNoProxy { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8, new DefaultThreadFactory("NettyClientWorker", true)); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() // 編碼、解碼處理器 .addLast("encoder", new NettyEncoder<>()) .addLast("decoder", new NettyDecoder()) .addLast("clientHandler", new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol messageProtocol) { System.out.println("PureNettyClient received messageProtocol=" + messageProtocol); } }) ; } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync(); Channel channel = channelFuture.sync().channel(); // 構(gòu)造消息對象 MessageProtocol messageProtocol = buildMessage(); // 發(fā)送消息 channel.writeAndFlush(messageProtocol); System.out.println("RpcClientNoProxy send request success!"); channelFuture.channel().closeFuture().sync(); } private static MessageProtocol buildMessage(){ // 構(gòu)造請求 RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setInterfaceName("myrpc.demo.common.service.UserService"); rpcRequest.setMethodName("getUserFriend"); rpcRequest.setParameterClasses(new Class[]{User.class,String.class}); User user = new User("Jerry",10); String message = "hello hello!"; rpcRequest.setParams(new Object[]{user,message}); // 構(gòu)造協(xié)議頭 MessageHeader messageHeader = new MessageHeader(); messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode()); messageHeader.setTwoWayFlag(false); messageHeader.setEventFlag(true); messageHeader.setSerializeType(MessageSerializeType.JSON.getCode()); messageHeader.setMessageId(rpcRequest.getMessageId()); return new MessageProtocol<>(messageHeader,rpcRequest); }}
netty處理流程圖
3.3 基于動態(tài)代理實現(xiàn)一個完整的點對點rpc功能截止目前,我們已經(jīng)實現(xiàn)了一個點對點rpc客戶端/服務(wù)端交互的功能,但是客戶端這邊的邏輯依然比較復(fù)雜(buildMessage方法)。前面提到,rpc中很重要的功能就是保持本地調(diào)用時語義的簡潔性,即客戶端實際使用時是希望直接用以下這種方式來進行調(diào)用,而不是去繁瑣的處理底層的網(wǎng)絡(luò)交互邏輯。
User user = new User("Jerry",10); String message = "hello hello!"; // 發(fā)起rpc調(diào)用并獲得返回值 User userFriend = userService.getUserFriend(user,message); System.out.println("userService.getUserFriend result=" + userFriend);
rpc框架需要屏蔽掉構(gòu)造底層消息發(fā)送/接受,序列化/反序列化相關(guān)的復(fù)雜性,而這時候就需要引入代理模式(動態(tài)代理)了。在MyRpc的底層,我們將客戶端需要調(diào)用的一個服務(wù)(比如UserService)抽象為Consumer對象,服務(wù)端的一個具體服務(wù)實現(xiàn)抽象為Provider對象。其中包含了對應(yīng)的服務(wù)的類以及對應(yīng)的服務(wù)地址,客戶端這邊使用jdk的動態(tài)代理生成代理對象,將復(fù)雜的、需要屏蔽的消息處理/網(wǎng)絡(luò)交互等邏輯都封裝在這個代理對象中。
public class Consumer { private Class> interfaceClass; private T proxy; private Bootstrap bootstrap; private URLAddress urlAddress; public Consumer(Class> interfaceClass, Bootstrap bootstrap, URLAddress urlAddress) { this.interfaceClass = interfaceClass; this.bootstrap = bootstrap; this.urlAddress = urlAddress; ClientDynamicProxy clientDynamicProxy = new ClientDynamicProxy(bootstrap,urlAddress); this.proxy = (T) Proxy.newProxyInstance( clientDynamicProxy.getClass().getClassLoader(), new Class[]{interfaceClass}, clientDynamicProxy); } public T getProxy() { return proxy; } public Class> getInterfaceClass() { return interfaceClass; }}
public class ConsumerBootstrap { private final Map,Consumer>> consumerMap = new HashMap<>(); private final Bootstrap bootstrap; private final URLAddress urlAddress; public ConsumerBootstrap(Bootstrap bootstrap, URLAddress urlAddress) { this.bootstrap = bootstrap; this.urlAddress = urlAddress; } public Consumer registerConsumer(Class clazz){ if(!consumerMap.containsKey(clazz)){ Consumer consumer = new Consumer<>(clazz,this.bootstrap,this.urlAddress); consumerMap.put(clazz,consumer); return consumer; } throw new MyRpcException("duplicate consumer! clazz=" + clazz); }}
public class Provider { private Class> interfaceClass; private T ref; private URLAddress urlAddress;}
客戶端代理對象生成/** * 客戶端動態(tài)代理 * */public class ClientDynamicProxy implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(ClientDynamicProxy.class); private final Bootstrap bootstrap; private final URLAddress urlAddress; public ClientDynamicProxy(Bootstrap bootstrap, URLAddress urlAddress) { this.bootstrap = bootstrap; this.urlAddress = urlAddress; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object localMethodResult = processLocalMethod(proxy,method,args); if(localMethodResult != null){ // 處理toString等對象自帶方法,不發(fā)起rpc調(diào)用 return localMethodResult; } logger.debug("ClientDynamicProxy before: methodName=" + method.getName()); // 構(gòu)造請求和協(xié)議頭 RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setInterfaceName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameterClasses(method.getParameterTypes()); rpcRequest.setParams(args); MessageHeader messageHeader = new MessageHeader(); messageHeader.setMessageFlag(MessageFlagEnums.REQUEST.getCode()); messageHeader.setTwoWayFlag(false); messageHeader.setEventFlag(true); messageHeader.setSerializeType(GlobalConfig.messageSerializeType.getCode()); messageHeader.setResponseStatus((byte)"a"); messageHeader.setMessageId(rpcRequest.getMessageId()); logger.debug("ClientDynamicProxy rpcRequest={}", JsonUtil.obj2Str(rpcRequest)); ChannelFuture channelFuture = bootstrap.connect(urlAddress.getHost(),urlAddress.getPort()).sync(); Channel channel = channelFuture.sync().channel(); // 通過Promise,將netty的異步轉(zhuǎn)為同步,參考dubbo DefaultFuture DefaultFuture defaultFuture = DefaultFutureManager.createNewFuture(channel,rpcRequest); channel.writeAndFlush(new MessageProtocol<>(messageHeader,rpcRequest)); logger.debug("ClientDynamicProxy writeAndFlush success, wait result"); // 調(diào)用方阻塞在這里 RpcResponse rpcResponse = defaultFuture.get(); logger.debug("ClientDynamicProxy defaultFuture.get() rpcResponse={}",rpcResponse); return processRpcResponse(rpcResponse); } private Object processLocalMethod(Object proxy, Method method, Object[] args) throws Exception { // 處理toString等對象自帶方法,不發(fā)起rpc調(diào)用 if (method.getDeclaringClass() == Object.class) { return method.invoke(proxy, args); } String methodName = method.getName(); Class>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0) { if ("toString".equals(methodName)) { return proxy.toString(); } else if ("hashCode".equals(methodName)) { return proxy.hashCode(); } } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return proxy.equals(args[0]); } // 返回null標識非本地方法,需要進行rpc調(diào)用 return null; } private Object processRpcResponse(RpcResponse rpcResponse){ if(rpcResponse.getExceptionValue() == null){ // 沒有異常,return正常的返回值 return rpcResponse.getReturnValue(); }else{ // 有異常,往外拋出去 throw new MyRpcRemotingException(rpcResponse.getExceptionValue()); } }}
客戶端接收響應(yīng)處理(通過DefaultFuture實現(xiàn)異步轉(zhuǎn)同步)/** * 客戶端 rpc響應(yīng)處理器 */public class NettyRpcResponseHandler extends SimpleChannelInboundHandler> { private static final Logger logger = LoggerFactory.getLogger(NettyRpcResponseHandler.class); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProtocol rpcResponseMessageProtocol) throws Exception { logger.debug("NettyRpcResponseHandler channelRead0={}",JsonUtil.obj2Str(rpcResponseMessageProtocol)); // 觸發(fā)客戶端的future,令其同步阻塞的線程得到結(jié)果 DefaultFutureManager.received(rpcResponseMessageProtocol.getBizDataBody()); }}
public class DefaultFutureManager { private static Logger logger = LoggerFactory.getLogger(DefaultFutureManager.class); public static final Map DEFAULT_FUTURE_CACHE = new ConcurrentHashMap<>(); public static void received(RpcResponse rpcResponse){ Long messageId = rpcResponse.getMessageId(); logger.debug("received rpcResponse={},DEFAULT_FUTURE_CACHE={}",rpcResponse,DEFAULT_FUTURE_CACHE); DefaultFuture defaultFuture = DEFAULT_FUTURE_CACHE.remove(messageId); if(defaultFuture != null){ logger.debug("remove defaultFuture success"); if(rpcResponse.getExceptionValue() != null){ // 異常處理 defaultFuture.completeExceptionally(rpcResponse.getExceptionValue()); }else{ // 正常返回 defaultFuture.complete(rpcResponse); } }else{ logger.debug("remove defaultFuture fail"); } } public static DefaultFuture createNewFuture(Channel channel, RpcRequest rpcRequest){ DefaultFuture defaultFuture = new DefaultFuture(channel,rpcRequest); return defaultFuture; }}
代理模式下點對點rpc的客戶端demopublic class RpcClientProxy { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(8, new DefaultThreadFactory("NettyClientWorker", true)); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() // 編碼、解碼處理器 .addLast("encoder", new NettyEncoder<>()) .addLast("decoder", new NettyDecoder()) // 響應(yīng)處理器 .addLast("clientHandler", new NettyRpcResponseHandler()) ; } }); ConsumerBootstrap consumerBootstrap = new ConsumerBootstrap(bootstrap, new URLAddress("127.0.0.1", 8888)); Consumer userServiceConsumer = consumerBootstrap.registerConsumer(UserService.class); // 獲得UserService的代理對象 UserService userService = userServiceConsumer.getProxy(); User user = new User("Jerry", 10); String message = "hello hello!"; // 發(fā)起rpc調(diào)用并獲得返回值 User userFriend = userService.getUserFriend(user, message); System.out.println("userService.getUserFriend result=" + userFriend); }}
可以看到,引入了代理模式后的使用方式就變得簡單很多了。到這一步,我們已經(jīng)實現(xiàn)了一個點對點的rpc通信的能力,并且如博客開頭中所提到的,沒有喪失本地調(diào)用語義的簡潔性。
總結(jié)這篇博客是我關(guān)于Mit6.824分布式系統(tǒng)公開課lab的第一篇博客,按照計劃會將實現(xiàn)簡易版rpc和raft k/v數(shù)據(jù)庫的心得以博客的形式分享出來,希望能幫助到對分布式系統(tǒng)相關(guān)技術(shù)的小伙伴。打個廣告:對于英語不好(沒法直接啃生肉)但又對國外著名的計算機公開課(涉及操作系統(tǒng)、數(shù)據(jù)庫、分布式系統(tǒng)、編譯原理、計算機網(wǎng)絡(luò)、算法等等)感興趣的小伙伴,可以咨詢simviso購買中英翻譯質(zhì)量很高的公開課視頻(比如Mit6.824,b站上開放了一部分免費的視頻:https://www.bilibili.com/video/BV1x7411M7Sf)。博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyRpc (release/lab1分支),內(nèi)容如有錯誤,還請多多指教。關(guān)鍵詞: