【分布式系統(tǒng)】RPC重新梳理一遍,收獲頗多
文章目錄
- 簡介
- 調(diào)用過程
- 動態(tài)代理
- 接口定義
- 接口實現(xiàn)
- 代理類
- 運行結(jié)果
- 序列化
- Serializable示例
- Person類
- 序列化代碼
- temp_person.out文件
- (1) 開頭
- (2) 序列化ID
- (3) 字段說明
- (4)字段值的說明
- 協(xié)議編碼
- 網(wǎng)絡(luò)傳輸
- 同步阻塞(blocking IO)
- 同步非阻塞(non-blocking IO)
- 多路復(fù)用(multiplexing IO)
- RPC框架示例
- 提供方
- 接口定義
- 接口實現(xiàn)
- 發(fā)布方
- 本地服務(wù)代理
- 服務(wù)端模擬
- 客戶端模擬
- 客戶端輸出日志
- 總結(jié)
我最早接觸分布式系統(tǒng)是在2017年,那時候使用了Dubbo服務(wù)治理框架,那時Dubbo還沒擁抱Spring Cloud,使用ZooKeeper實現(xiàn)服務(wù)注冊中心。也就從那時候起開始認識RPC框架。
簡介
RPC是Remote Procedure Call(遠程過程調(diào)用),即一臺服務(wù)器上的服務(wù)可以像調(diào)用本進程內(nèi)的方法一樣去調(diào)用遠程服務(wù)器上的方法,簡單點理解就是讓不同網(wǎng)絡(luò)節(jié)點的服務(wù)相互調(diào)用,是一種典型的分布式節(jié)點間同步通信的實現(xiàn)方式。
RPC常見的應(yīng)用領(lǐng)域:
- 消息隊列
- 分布式緩存
- 分布式數(shù)據(jù)庫
RPC屏蔽了網(wǎng)絡(luò)細節(jié),使遠程服務(wù)的調(diào)用像本地一樣,所以給程序員提供了很大的方便,只需關(guān)注業(yè)務(wù)即可。
RPC常用框架:
- Facebook的Apache Thrift
- Hessian
- gRPC
- Dubbo
RPC框架的關(guān)鍵點:
- 動態(tài)代理
使客戶端可以像調(diào)用本地方法一樣調(diào)用服務(wù)端接口 - 序列化
將傳輸?shù)男畔⒋虬勺止?jié)碼,適應(yīng)網(wǎng)絡(luò)傳輸 - 協(xié)議編碼
對序列化信息進行標注,使其能順利的到達目的地,且能反序列化 - 網(wǎng)絡(luò)傳輸
使用socket套接字
調(diào)用過程

- 首先服務(wù)提供方制定接口和數(shù)據(jù)結(jié)構(gòu),然后將接口和數(shù)據(jù)結(jié)構(gòu)同步給服務(wù)調(diào)用方
- 服務(wù)調(diào)用方使用“動態(tài)代理”向提供方發(fā)起調(diào)用
- 服務(wù)調(diào)用方將代理信息進行序列化操作,編碼成二進制信息
- 服務(wù)調(diào)用方將編碼的二進制信息進行協(xié)議編碼,對二進制信息進行說明,比如數(shù)據(jù)包大小、請求類型等
- 服務(wù)提供方接收到服務(wù)調(diào)用方發(fā)過來的信息后,先進行協(xié)議解碼,解讀協(xié)議信息
- 服務(wù)提供方將協(xié)議解碼后的數(shù)據(jù)包進行反序列化
- 服務(wù)提供方使用反射機制,獲取動態(tài)代理封裝好的請求
- 服務(wù)提供方處理請求,發(fā)送響應(yīng)信息給調(diào)用方
- 響應(yīng)數(shù)據(jù)傳輸就是和請求處理一樣的過程
動態(tài)代理
服務(wù)調(diào)用者調(diào)用的服務(wù)實際是遠程服務(wù)的本地代理,這就涉及到了JDK動態(tài)代理。通過動態(tài)代理攔截機制,將本地調(diào)用封裝成遠程服務(wù)調(diào)用。
接口定義
package com.demo.dynamic.provide;
public interface HiService {
public String hi(String username);
}
接口實現(xiàn)
package com.demo.dynamic.provide.impl;
import com.demo.dynamic.provide.HiService;
public class HiServiceImpl implements HiService {
@Override
public String hi(String username) {
System.out.println("username: " + username);
return "hi " + username;
}
}
代理類
package com.demo.dynamic.client;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class DynamicHiSubject implements InvocationHandler {
// 需要代理的真實對象
private Object target = null;
public DynamicHiSubject(Object object) {
// 綁定真實對象
this.target = object;
}
// 調(diào)用真實對象的方法并執(zhí)行
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("start now ......");
Object retObj = method.invoke(target, args);
System.out.println("end over");
return retObj;
}
}
DynamicHiSubject實現(xiàn)InvocationHandler接口后,通過反射的方式調(diào)用傳入真實對象的方法,正是因為這種對真實對象的不限定性,所以稱作動態(tài)代理。
package com.demo.dynamic.test;
import com.demo.dynamic.client.DynamicHiSubject;
import com.demo.dynamic.provide.HiService;
import com.demo.dynamic.provide.impl.HiServiceImpl;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
public class ProxyTest {
public static void main(String[] args){
// 聲明一個真實的服務(wù)端對象
HiService hiService = new HiServiceImpl();
// 聲明一個代理類對象,并在構(gòu)造器中進行代理對象和真實對象的綁定
InvocationHandler handler = new DynamicHiSubject(hiService);
// 生成代理對象實例
// 第一個參數(shù)加載代理對象,第二個參數(shù)指定服務(wù)端接口,第三個參數(shù)將代理對象關(guān)聯(lián)到InvocationHandler
HiService hiServiceProxy = (HiService) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
hiService.getClass().getInterfaces(),
handler);
String retStr = hiServiceProxy.hi("lilei");
System.out.println(retStr);
}
}
運行結(jié)果
start now ......
username: lilei
end over
hi lilei
序列化
遠程調(diào)用需要將請求對象裝換成二進制碼流進行網(wǎng)絡(luò)傳輸,不同的序列化框架支持的數(shù)據(jù)類型、數(shù)據(jù)包大小、性能各不相同。反過來將二進制碼流轉(zhuǎn)換成對象的過程,稱為反序列化。
常見的序列化方式:
名稱 |
優(yōu)點 |
缺點 |
JSON |
簡單;key-value方式; |
空間開銷大;遇到類型轉(zhuǎn)換會損耗性能; |
Hessian |
跨語言;性能比JSON高;兼容性好;穩(wěn)定性好; |
|
Protobuf |
輕便;高效;適用結(jié)構(gòu)化數(shù)據(jù);多語言支持; |
|
Thrift |
高性能;輕量級; |
兼容性差 |
JDK Serializable |
性能相對差點,序列化數(shù)據(jù)較大 |
Serializable示例
我用JDK自帶的序列化研究一下,看看序列化后的數(shù)據(jù)是什么樣的。
Person類
package com.demo.serial.domain;
import java.io.Serializable;
public class Person implements Serializable {
String name = "zhangsan";
}
序列化代碼
package com.demo.serial.test;
import com.demo.serial.domain.Person;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
public class PersonTest {
public static void main(String args[]) throws IOException {
FileOutputStream fos = new FileOutputStream("temp_person.out");
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(new Person());
oos.close();
}
}
temp_person.out文件
使用16進制格式顯示,文件內(nèi)容如下:

表格顯示為(表中括號數(shù)字部分便于說明):
0 |
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
a |
b |
c |
d |
e |
f |
ac(1) |
ed |
00 |
05 |
73 |
72 |
00 |
1d |
63 |
6f |
6d |
2e |
64 |
65 |
6d |
6f |
2e |
73 |
65 |
72 |
69 |
61 |
6c |
2e |
64 |
6f |
6d |
61 |
69 |
6e |
2e |
50 |
65 |
72 |
73 |
6f |
6e |
8c(2) |
b8 |
96 |
4e |
4a |
1f |
f8 |
5f |
02(3) |
00 |
01 |
4c |
00 |
04 |
6e |
61 |
6d |
65 |
74 |
00 |
12 |
4c |
6a |
61 |
76 |
61 |
2f |
6c |
61 |
6e |
67 |
2f |
53 |
74 |
72 |
69 |
6e |
67 |
3b |
78(4) |
70 |
74 |
00 |
08 |
7a |
68 |
61 |
6e |
67 |
73 |
61 |
6e |
(1) 開頭
ac ed: Magic Number,幻數(shù),固定值
00 05: Version number,版本號
73: new Object,表示一個新對象
72: new Class Descriptor,聲明一個新類
00 1d: 類的長度,包含全路徑名,29個字節(jié),到第三行的6e字節(jié)

(2) 序列化ID
代碼中未指定,則隨機生成一個8字節(jié)的id
(3) 字段說明
02: Bit mask for ObjectStreamClass flag,Indicates class is Serializable。聲明該類支持序列化
00 01: 該類包含的字段數(shù)量
4c: ascii值是L,第一個域的類型
00 04: 表示字段長度,4個字節(jié)
6e 61 6d 65: 表示字段名稱name
74: new String,新的字符串對象
00 12: 類的全名,包含路徑共18字節(jié)
4c…3b: 標準對象簽名,java.lang.String;
(4)字段值的說明
78: 對象塊的結(jié)束標志
70: Null object reference,沒有其他父類的標志
74: 新的對象
00 08: 新的對象長度,8字節(jié)
7a…6e: zhangsan
由上看出,Serializable序列化后的數(shù)據(jù)主要包括:序列化協(xié)議、類聲明信息、域信息等。本例為了解讀序列化后的數(shù)據(jù)示例比較簡單,更詳細的內(nèi)容可以參考java.io.ObjectStreamConstants,這個接口里定義了流數(shù)據(jù)中的一些標記值。除了這些標記值以外,其它的十六進制都可以轉(zhuǎn)換成ascii碼,就可以知道是什么數(shù)據(jù)。
協(xié)議編碼
序列化將請求對象轉(zhuǎn)換成可以在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù),這個時候還需要協(xié)議的支撐,因為二進制流在網(wǎng)絡(luò)上傳輸,需要包裝一些額外的信息,比如數(shù)據(jù)包發(fā)送到哪里,發(fā)送大小等信息。這就需要制定規(guī)則化的rpc協(xié)議。
協(xié)議是 RPC 的核心,它規(guī)范了數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸內(nèi)容和格式。除必須的請求、響應(yīng)數(shù)據(jù)外,通常還會包含額外控制數(shù)據(jù),如單次請求的序列化方式、超時時間、壓縮方式和鑒權(quán)信息等。
以Dubbo2協(xié)議為例(官方圖示太小,我手繪一個):

- Magic - Magic High & Magic Low (16 bits)
使用值0xdabb標識dubbo協(xié)議 - Req/Res (1 bit)
請求、響應(yīng)標識位,1-請求,0-響應(yīng) - 2 Way (1 bit)
僅當(dāng)請求時有效,是否返回值,1需要返回值 - Event (1 bit)
標識事件消息,1-是 - Serialization ID (5 bit)
標識序列化類型,fastjson是6 - Status (8 bits)
僅用于響應(yīng),狀態(tài)位主要有:
20-OK、
30-CLIENT_TIMEOUT、
31-SERVER_TIMEOUT、
40-BAD_REQUEST、
50-BAD_RESPONSE、
60-SERVICE_NOT_FOUND、
70-SERVICE_ERROR、
80-SERVER_ERROR、
90-CLIENT_ERROR、
100-SERVER_THREADPOOL_EXHAUSTED_ERROR - Request ID (64 bits)
long類型,請求唯一標識 - Data Length (32)
Variable Part部分序列化后的字節(jié)長度,integer類型 - Variable Part
Req/Res=1,包含Dubbo version、Service name、Service version、Method name、Method parameter types、Method arguments、Attachments
Req/Res=0,包含Return value type、Return value
RPC 協(xié)議的設(shè)計需要考慮以下內(nèi)容:
- 通用性: 統(tǒng)一的二進制格式,跨語言、跨平臺、多傳輸層協(xié)議支持
- 擴展性: 協(xié)議增加字段、升級、支持用戶擴展和附加業(yè)務(wù)元數(shù)據(jù)
- 性能:As fast as it can be
- 穿透性:能夠被各種終端設(shè)備識別和轉(zhuǎn)發(fā):網(wǎng)關(guān)、代理服務(wù)器等 通用性和高性能通常無法同時達到,需要協(xié)議設(shè)計者進行一定的取舍。
網(wǎng)絡(luò)傳輸
網(wǎng)絡(luò)程序所做的很大一部分工作都是簡單的輸入輸出:將數(shù)據(jù)字節(jié)從一個系統(tǒng)移動到另一個系統(tǒng)。在很大程度上講,讀取服務(wù)器發(fā)送給你的數(shù)據(jù)與讀取文件并沒有什么不同。向客戶端發(fā)送文本與寫文件也沒有什么不同。
制定協(xié)議之后,服務(wù)調(diào)用方和提供方有了統(tǒng)一的讀寫和轉(zhuǎn)換標準,那么接下來就要進行網(wǎng)絡(luò)傳輸了。RPC網(wǎng)絡(luò)傳輸本質(zhì)上是一次調(diào)用方和提供方之間網(wǎng)絡(luò)信息交換過程。如果屏蔽底層通信的細節(jié),那么我們就可以理解為,服務(wù)調(diào)用方知道服務(wù)提供方的ip、socket端口就可以了。如果要深究細節(jié),那么這一章節(jié)就涉及到了系統(tǒng)調(diào)用了。
操作系統(tǒng)的抽象層中有對外提供的接口,叫做系統(tǒng)調(diào)用,例如read、open、close等。跟進程的內(nèi)部函數(shù)調(diào)用不同的是,這種系統(tǒng)調(diào)用會讓進程從用戶態(tài)切換到內(nèi)核態(tài),也就是到操作系統(tǒng)的內(nèi)核代碼中去執(zhí)行。

Linux的內(nèi)核將所有外部設(shè)備都看做一個文件來操作,對一個文件的讀寫操作會調(diào)用內(nèi)核提供的系統(tǒng)命令,返回一個file descriptor(fd,文件描述符)。而對一個socket的讀寫也會有相應(yīng)的描述符,稱為socketfd(socket 描述符),描述符就是一個數(shù)字,它指向內(nèi)核中的一個結(jié)構(gòu)體(文件路徑,數(shù)據(jù)區(qū)等一些屬性)。
由此得知,用戶程序不能直接訪問網(wǎng)卡,必須通過系統(tǒng)調(diào)用去訪問網(wǎng)卡,我們從數(shù)據(jù)流轉(zhuǎn)的過程總結(jié)如下:

由于在網(wǎng)絡(luò)上傳輸二進制流,同一個請求可能分成好幾塊,并且到達目的地的時間有先后,那么程序計算必須在所有數(shù)據(jù)包獲取后才能進行。為了避免程序阻塞問題,IO模型一直在演變,常見的IO模型主要有:
- 同步阻塞(blocking IO)
- 同步非阻塞(non-blocking IO)
- 多路復(fù)用(multiplexing IO)
同步阻塞(blocking IO)
用戶程序發(fā)起請求后,需要一直等待響應(yīng),不能進行其它操作。

同步非阻塞(non-blocking IO)
用戶程序發(fā)起請求后,無需一直等待。會不斷的輪詢,直到數(shù)據(jù)準備好。

多路復(fù)用(multiplexing IO)
在同步非阻塞基礎(chǔ)上,添加一個獨立的進程專門負責(zé)監(jiān)聽多個請求的數(shù)據(jù)狀態(tài),發(fā)現(xiàn)數(shù)據(jù)準備好主動通知所屬請求進程去處理。

RPC框架示例
參考《分布式服務(wù)框架原理與實踐》李林鋒/著
通過Java原生序列化、Socket通信、動態(tài)代理和反射機制,實現(xiàn)最簡單的RPC框架。
提供方:負責(zé)提供服務(wù)接口和服務(wù)實現(xiàn)類
發(fā)布方:運行在服務(wù)端,負責(zé)發(fā)布服務(wù)供消費者調(diào)用
本地服務(wù)代理:運行在客戶端,通過代理調(diào)用遠程服務(wù)提供者,并將響應(yīng)結(jié)果返回給本地消費者
提供方
接口定義
package com.demo.rpc.service;
public interface HiService {
public String sayHi(String username);
}
接口實現(xiàn)
package com.demo.rpc.service.impl;
import com.demo.rpc.service.HiService;
public class HiServiceImpl implements HiService {
@Override
public String sayHi(String username) {
return username != null ? "hi, " + username : "hi";
}
}
發(fā)布方
- 監(jiān)聽TCP連接,將客戶端請求封裝成Task使用線程池運行
- 將客戶端發(fā)送的碼流反序列化,反射調(diào)用服務(wù)實現(xiàn)者,獲取執(zhí)行結(jié)果
- 將返回結(jié)果反序列化,發(fā)送給客戶端
- 調(diào)用結(jié)束后釋放資源
package com.demo.rpc;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class RpcRegister {
static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void register(String host, int port) throws Exception {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(host, port));
try{
while (true) {
executor.execute(new RegisterTask(serverSocket.accept()));
}
} finally {
serverSocket.close();
}
}
private static class RegisterTask implements Runnable {
Socket socket = null;
public RegisterTask(Socket client) {
this.socket = client;
}
@Override
public void run() {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(socket.getInputStream());
// 第一個字符串是接口名
String interfaceName = inputStream.readUTF();
// 類加載
Class> service = Class.forName(interfaceName);
// 第二個字符串是方法名
String methodName = inputStream.readUTF();
// 第三個對象是參數(shù)類型
Class>[] parameterTypes = (Class>[])inputStream.readObject();
// 第四個對象是參數(shù)值
Object[] arguments = (Object[])inputStream.readObject();
// 使用反射機制,調(diào)用業(yè)務(wù)邏輯
Method method = service.getMethod(methodName, parameterTypes);
Object result = method.invoke(service.newInstance(), arguments);
// 返回數(shù)據(jù)
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
本地服務(wù)代理
- 將本地的接口調(diào)用轉(zhuǎn)換成JDK的動態(tài)代理,在動態(tài)代理中實現(xiàn)接口的遠程調(diào)用
- 創(chuàng)建Socket客戶端,根據(jù)指定地址連接遠程服務(wù)提供者
- 將遠程服務(wù)調(diào)用所需的接口類、方法名、參數(shù)類型、參數(shù)值等序列化后發(fā)送到服務(wù)端
- 同步阻塞等待響應(yīng)信息,獲取應(yīng)答信息后返回
package com.demo.rpc;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
public class RpcClient {
public T call(final Class> serviceClass, final InetSocketAddress addr) {
return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),
new Class>[]{serviceClass.getInterfaces()[0]},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
socket = new Socket();
socket.connect(addr);
outputStream = new ObjectOutputStream(socket.getOutputStream());
// 設(shè)置請求代理的類名稱
outputStream.writeUTF(serviceClass.getName());
// 設(shè)置請求方法名
outputStream.writeUTF(method.getName());
// 設(shè)置請求方法參數(shù)類型
outputStream.writeObject(method.getParameterTypes());
// 設(shè)置請求方法參數(shù)
outputStream.writeObject(args);
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
}
}
});
}
}
服務(wù)端模擬
package com.demo.rpc;
public class ServerStart {
public static void main(String[] args){
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcRegister.register("127.0.0.1", 8088);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
客戶端模擬
package com.demo.rpc;
import com.demo.rpc.service.HiService;
import com.demo.rpc.service.impl.HiServiceImpl;
import java.net.InetSocketAddress;
public class ClientStart {
public static void main(String[] args){
RpcClient rpcClient = new RpcClient<>();
HiService hiService = rpcClient.call(HiServiceImpl.class, new InetSocketAddress("127.0.0.1", 8088));
System.out.println(hiService.sayHi("China Qingdao"));
}
}
客戶端輸出日志
hi, China Qingdao
此示例展示了一個完整的rpc調(diào)用流程,之前使用dubbo的時候,生產(chǎn)者只需暴露接口及相關(guān)序列化,且注冊到zookeeper即可,屏蔽了底層細節(jié)。
總結(jié)
通過梳理RPC相關(guān)知識點,加深了對動態(tài)代理、序列化和協(xié)議編碼的理解,熟悉了rpc的調(diào)用流程,為后邊深入研究一下Dubbo打下了良好的基礎(chǔ)。
本文僅代表作者觀點,版權(quán)歸原創(chuàng)者所有,如需轉(zhuǎn)載請在文中注明來源及作者名字。
免責(zé)聲明:本文系轉(zhuǎn)載編輯文章,僅作分享之用。如分享內(nèi)容、圖片侵犯到您的版權(quán)或非授權(quán)發(fā)布,請及時與我們聯(lián)系進行審核處理或刪除,您可以發(fā)送材料至郵箱:service@tojoy.com





