欧美三级电影完整|亚洲一二三四久久|性爱视频精品一区二区免费在线观看|国产精品啪啪视频|婷婷六月综合操人妻视频网站|99爱免费视频在线观看|美女一级片在线观看|北京熟女88av|免费看黄色A级电影|欧美黄色毛片儿

【分布式系統(tǒng)】RPC重新梳理一遍,收獲頗多

2023-04-12


文章目錄

  • 簡介
  • 調(diào)用過程
  • 動態(tài)代理
  • 接口定義
  • 接口實現(xiàn)
  • 代理類
  • 運行結(jié)果
  • 序列化
  • Serializable示例
  • Person類
  • 序列化代碼
  • temp_person.out文件
  • (1) 開頭
  • (2) 序列化ID
  • (3) 字段說明
  • (4)字段值的說明
  • 協(xié)議編碼
  • 網(wǎng)絡(luò)傳輸
  • 同步阻塞(blocking IO)
  • 同步非阻塞(non-blocking IO)
  • 多路復(fù)用(multiplexing IO)
  • RPC框架示例
  • 提供方
  • 接口定義
  • 接口實現(xiàn)
  • 發(fā)布方
  • 本地服務(wù)代理
  • 服務(wù)端模擬
  • 客戶端模擬
  • 客戶端輸出日志
  • 總結(jié)

我最早接觸分布式系統(tǒng)是在2017年,那時候使用了Dubbo服務(wù)治理框架,那時Dubbo還沒擁抱Spring Cloud,使用ZooKeeper實現(xiàn)服務(wù)注冊中心。也就從那時候起開始認識RPC框架。


簡介


RPC是Remote Procedure Call(遠程過程調(diào)用),即一臺服務(wù)器上的服務(wù)可以像調(diào)用本進程內(nèi)的方法一樣去調(diào)用遠程服務(wù)器上的方法,簡單點理解就是讓不同網(wǎng)絡(luò)節(jié)點的服務(wù)相互調(diào)用,是一種典型的分布式節(jié)點間同步通信的實現(xiàn)方式。


RPC常見的應(yīng)用領(lǐng)域:


  • 消息隊列
  • 分布式緩存
  • 分布式數(shù)據(jù)庫

RPC屏蔽了網(wǎng)絡(luò)細節(jié),使遠程服務(wù)的調(diào)用像本地一樣,所以給程序員提供了很大的方便,只需關(guān)注業(yè)務(wù)即可。


RPC常用框架:


  • Facebook的Apache Thrift
  • Hessian
  • gRPC
  • Dubbo

RPC框架的關(guān)鍵點:


  • 動態(tài)代理
    使客戶端可以像調(diào)用本地方法一樣調(diào)用服務(wù)端接口
  • 序列化
    將傳輸?shù)男畔⒋虬勺止?jié)碼,適應(yīng)網(wǎng)絡(luò)傳輸
  • 協(xié)議編碼
    對序列化信息進行標注,使其能順利的到達目的地,且能反序列化
  • 網(wǎng)絡(luò)傳輸
    使用socket套接字

調(diào)用過程



  1. 首先服務(wù)提供方制定接口和數(shù)據(jù)結(jié)構(gòu),然后將接口和數(shù)據(jù)結(jié)構(gòu)同步給服務(wù)調(diào)用方
  2. 服務(wù)調(diào)用方使用“動態(tài)代理”向提供方發(fā)起調(diào)用
  3. 服務(wù)調(diào)用方將代理信息進行序列化操作,編碼成二進制信息
  4. 服務(wù)調(diào)用方將編碼的二進制信息進行協(xié)議編碼,對二進制信息進行說明,比如數(shù)據(jù)包大小、請求類型等
  5. 服務(wù)提供方接收到服務(wù)調(diào)用方發(fā)過來的信息后,先進行協(xié)議解碼,解讀協(xié)議信息
  6. 服務(wù)提供方將協(xié)議解碼后的數(shù)據(jù)包進行反序列化
  7. 服務(wù)提供方使用反射機制,獲取動態(tài)代理封裝好的請求
  8. 服務(wù)提供方處理請求,發(fā)送響應(yīng)信息給調(diào)用方
  9. 響應(yīng)數(shù)據(jù)傳輸就是和請求處理一樣的過程

動態(tài)代理


服務(wù)調(diào)用者調(diào)用的服務(wù)實際是遠程服務(wù)的本地代理,這就涉及到了JDK動態(tài)代理。通過動態(tài)代理攔截機制,將本地調(diào)用封裝成遠程服務(wù)調(diào)用。


接口定義

package com.demo.dynamic.provide;

public interface HiService {
    public String hi(String username);
}

接口實現(xiàn)

package com.demo.dynamic.provide.impl;

import com.demo.dynamic.provide.HiService;

public class HiServiceImpl implements HiService {

    @Override
    public String hi(String username) {
        System.out.println("username: " + username);
        return "hi " + username;
    }
}

代理類

package com.demo.dynamic.client;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class DynamicHiSubject implements InvocationHandler {
    // 需要代理的真實對象
    private Object target = null;

    public DynamicHiSubject(Object object) {
        // 綁定真實對象
        this.target = object;
    }

    // 調(diào)用真實對象的方法并執(zhí)行
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("start now ......");

        Object retObj = method.invoke(target, args);

        System.out.println("end over");

        return retObj;
    }
}

DynamicHiSubject實現(xiàn)InvocationHandler接口后,通過反射的方式調(diào)用傳入真實對象的方法,正是因為這種對真實對象的不限定性,所以稱作動態(tài)代理。


package com.demo.dynamic.test;

import com.demo.dynamic.client.DynamicHiSubject;
import com.demo.dynamic.provide.HiService;
import com.demo.dynamic.provide.impl.HiServiceImpl;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;

public class ProxyTest {
    public static void main(String[] args){
        // 聲明一個真實的服務(wù)端對象
        HiService hiService = new HiServiceImpl();
        // 聲明一個代理類對象,并在構(gòu)造器中進行代理對象和真實對象的綁定
        InvocationHandler handler = new DynamicHiSubject(hiService);
        // 生成代理對象實例
        // 第一個參數(shù)加載代理對象,第二個參數(shù)指定服務(wù)端接口,第三個參數(shù)將代理對象關(guān)聯(lián)到InvocationHandler
        HiService hiServiceProxy = (HiService) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
                hiService.getClass().getInterfaces(),
                handler);

        String retStr = hiServiceProxy.hi("lilei");

        System.out.println(retStr);
    }
}

運行結(jié)果

start now ......
username: lilei
end over
hi lilei

序列化


遠程調(diào)用需要將請求對象裝換成二進制碼流進行網(wǎng)絡(luò)傳輸,不同的序列化框架支持的數(shù)據(jù)類型、數(shù)據(jù)包大小、性能各不相同。反過來將二進制碼流轉(zhuǎn)換成對象的過程,稱為反序列化。
常見的序列化方式:


名稱


優(yōu)點


缺點


JSON


簡單;key-value方式;


空間開銷大;遇到類型轉(zhuǎn)換會損耗性能;


Hessian


跨語言;性能比JSON高;兼容性好;穩(wěn)定性好;


Protobuf


輕便;高效;適用結(jié)構(gòu)化數(shù)據(jù);多語言支持;


Thrift


高性能;輕量級;


兼容性差


JDK Serializable


性能相對差點,序列化數(shù)據(jù)較大


Serializable示例

我用JDK自帶的序列化研究一下,看看序列化后的數(shù)據(jù)是什么樣的。


Person類

package com.demo.serial.domain;

import java.io.Serializable;

public class Person implements Serializable {

    String name = "zhangsan";
}

序列化代碼

package com.demo.serial.test;

import com.demo.serial.domain.Person;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

public class PersonTest {

    public static void main(String args[]) throws IOException {
        FileOutputStream fos = new FileOutputStream("temp_person.out");
        ObjectOutputStream oos = new ObjectOutputStream(fos);
        oos.writeObject(new Person());
        oos.close();
    }
}

temp_person.out文件

使用16進制格式顯示,文件內(nèi)容如下:



表格顯示為(表中括號數(shù)字部分便于說明):


0


1


2


3


4


5


6


7


8


9


a


b


c


d


e


f


ac(1)


ed


00


05


73


72


00


1d


63


6f


6d


2e


64


65


6d


6f


2e


73


65


72


69


61


6c


2e


64


6f


6d


61


69


6e


2e


50


65


72


73


6f


6e


8c(2)


b8


96


4e


4a


1f


f8


5f


02(3)


00


01


4c


00


04


6e


61


6d


65


74


00


12


4c


6a


61


76


61


2f


6c


61


6e


67


2f


53


74


72


69


6e


67


3b


78(4)


70


74


00


08


7a


68


61


6e


67


73


61


6e


(1) 開頭

ac ed: Magic Number,幻數(shù),固定值


00 05: Version number,版本號


73: new Object,表示一個新對象


72: new Class Descriptor,聲明一個新類


00 1d: 類的長度,包含全路徑名,29個字節(jié),到第三行的6e字節(jié)



(2) 序列化ID

代碼中未指定,則隨機生成一個8字節(jié)的id


(3) 字段說明

02: Bit mask for ObjectStreamClass flag,Indicates class is Serializable。聲明該類支持序列化
00 01: 該類包含的字段數(shù)量
4c: ascii值是L,第一個域的類型
00 04: 表示字段長度,4個字節(jié)
6e 61 6d 65: 表示字段名稱name
74: new String,新的字符串對象
00 12: 類的全名,包含路徑共18字節(jié)
4c…3b: 標準對象簽名,java.lang.String;


(4)字段值的說明

78: 對象塊的結(jié)束標志
70: Null object reference,沒有其他父類的標志
74: 新的對象
00 08: 新的對象長度,8字節(jié)
7a…6e: zhangsan
由上看出,Serializable序列化后的數(shù)據(jù)主要包括:序列化協(xié)議、類聲明信息、域信息等。本例為了解讀序列化后的數(shù)據(jù)示例比較簡單,更詳細的內(nèi)容可以參考java.io.ObjectStreamConstants,這個接口里定義了流數(shù)據(jù)中的一些標記值。除了這些標記值以外,其它的十六進制都可以轉(zhuǎn)換成ascii碼,就可以知道是什么數(shù)據(jù)。


協(xié)議編碼


序列化將請求對象轉(zhuǎn)換成可以在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù),這個時候還需要協(xié)議的支撐,因為二進制流在網(wǎng)絡(luò)上傳輸,需要包裝一些額外的信息,比如數(shù)據(jù)包發(fā)送到哪里,發(fā)送大小等信息。這就需要制定規(guī)則化的rpc協(xié)議。


協(xié)議是 RPC 的核心,它規(guī)范了數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸內(nèi)容和格式。除必須的請求、響應(yīng)數(shù)據(jù)外,通常還會包含額外控制數(shù)據(jù),如單次請求的序列化方式、超時時間、壓縮方式和鑒權(quán)信息等。



以Dubbo2協(xié)議為例(官方圖示太小,我手繪一個):



  • Magic - Magic High & Magic Low (16 bits)
    使用值0xdabb標識dubbo協(xié)議
  • Req/Res (1 bit)
    請求、響應(yīng)標識位,1-請求,0-響應(yīng)
  • 2 Way (1 bit)
    僅當(dāng)請求時有效,是否返回值,1需要返回值
  • Event (1 bit)
    標識事件消息,1-是
  • Serialization ID (5 bit)
    標識序列化類型,fastjson是6
  • Status (8 bits)
    僅用于響應(yīng),狀態(tài)位主要有:
    20-OK、
    30-CLIENT_TIMEOUT、
    31-SERVER_TIMEOUT、
    40-BAD_REQUEST、
    50-BAD_RESPONSE、
    60-SERVICE_NOT_FOUND、
    70-SERVICE_ERROR、
    80-SERVER_ERROR、
    90-CLIENT_ERROR、
    100-SERVER_THREADPOOL_EXHAUSTED_ERROR
  • Request ID (64 bits)
    long類型,請求唯一標識
  • Data Length (32)
    Variable Part部分序列化后的字節(jié)長度,integer類型
  • Variable Part
    Req/Res=1,包含Dubbo version、Service name、Service version、Method name、Method parameter types、Method arguments、Attachments
    Req/Res=0,包含Return value type、Return value

RPC 協(xié)議的設(shè)計需要考慮以下內(nèi)容:


  • 通用性: 統(tǒng)一的二進制格式,跨語言、跨平臺、多傳輸層協(xié)議支持
  • 擴展性: 協(xié)議增加字段、升級、支持用戶擴展和附加業(yè)務(wù)元數(shù)據(jù)
  • 性能:As fast as it can be
  • 穿透性:能夠被各種終端設(shè)備識別和轉(zhuǎn)發(fā):網(wǎng)關(guān)、代理服務(wù)器等 通用性和高性能通常無法同時達到,需要協(xié)議設(shè)計者進行一定的取舍。


網(wǎng)絡(luò)傳輸


網(wǎng)絡(luò)程序所做的很大一部分工作都是簡單的輸入輸出:將數(shù)據(jù)字節(jié)從一個系統(tǒng)移動到另一個系統(tǒng)。在很大程度上講,讀取服務(wù)器發(fā)送給你的數(shù)據(jù)與讀取文件并沒有什么不同。向客戶端發(fā)送文本與寫文件也沒有什么不同。



制定協(xié)議之后,服務(wù)調(diào)用方和提供方有了統(tǒng)一的讀寫和轉(zhuǎn)換標準,那么接下來就要進行網(wǎng)絡(luò)傳輸了。RPC網(wǎng)絡(luò)傳輸本質(zhì)上是一次調(diào)用方和提供方之間網(wǎng)絡(luò)信息交換過程。如果屏蔽底層通信的細節(jié),那么我們就可以理解為,服務(wù)調(diào)用方知道服務(wù)提供方的ip、socket端口就可以了。如果要深究細節(jié),那么這一章節(jié)就涉及到了系統(tǒng)調(diào)用了。


操作系統(tǒng)的抽象層中有對外提供的接口,叫做系統(tǒng)調(diào)用,例如read、open、close等。跟進程的內(nèi)部函數(shù)調(diào)用不同的是,這種系統(tǒng)調(diào)用會讓進程從用戶態(tài)切換到內(nèi)核態(tài),也就是到操作系統(tǒng)的內(nèi)核代碼中去執(zhí)行。




Linux的內(nèi)核將所有外部設(shè)備都看做一個文件來操作,對一個文件的讀寫操作會調(diào)用內(nèi)核提供的系統(tǒng)命令,返回一個file descriptor(fd,文件描述符)。而對一個socket的讀寫也會有相應(yīng)的描述符,稱為socketfd(socket 描述符),描述符就是一個數(shù)字,它指向內(nèi)核中的一個結(jié)構(gòu)體(文件路徑,數(shù)據(jù)區(qū)等一些屬性)。



由此得知,用戶程序不能直接訪問網(wǎng)卡,必須通過系統(tǒng)調(diào)用去訪問網(wǎng)卡,我們從數(shù)據(jù)流轉(zhuǎn)的過程總結(jié)如下:



由于在網(wǎng)絡(luò)上傳輸二進制流,同一個請求可能分成好幾塊,并且到達目的地的時間有先后,那么程序計算必須在所有數(shù)據(jù)包獲取后才能進行。為了避免程序阻塞問題,IO模型一直在演變,常見的IO模型主要有:


  • 同步阻塞(blocking IO)
  • 同步非阻塞(non-blocking IO)
  • 多路復(fù)用(multiplexing IO)

同步阻塞(blocking IO)

用戶程序發(fā)起請求后,需要一直等待響應(yīng),不能進行其它操作。



同步非阻塞(non-blocking IO)

用戶程序發(fā)起請求后,無需一直等待。會不斷的輪詢,直到數(shù)據(jù)準備好。



多路復(fù)用(multiplexing IO)

在同步非阻塞基礎(chǔ)上,添加一個獨立的進程專門負責(zé)監(jiān)聽多個請求的數(shù)據(jù)狀態(tài),發(fā)現(xiàn)數(shù)據(jù)準備好主動通知所屬請求進程去處理。



RPC框架示例


參考《分布式服務(wù)框架原理與實踐》李林鋒/著



通過Java原生序列化、Socket通信、動態(tài)代理和反射機制,實現(xiàn)最簡單的RPC框架。
提供方:負責(zé)提供服務(wù)接口和服務(wù)實現(xiàn)類
發(fā)布方:運行在服務(wù)端,負責(zé)發(fā)布服務(wù)供消費者調(diào)用
本地服務(wù)代理:運行在客戶端,通過代理調(diào)用遠程服務(wù)提供者,并將響應(yīng)結(jié)果返回給本地消費者


提供方

接口定義

package com.demo.rpc.service;

public interface HiService {
    public String sayHi(String username);
}

接口實現(xiàn)

package com.demo.rpc.service.impl;

import com.demo.rpc.service.HiService;

public class HiServiceImpl implements HiService {
    @Override
    public String sayHi(String username) {
        return username != null ? "hi, " + username : "hi";
    }
}

發(fā)布方

  1. 監(jiān)聽TCP連接,將客戶端請求封裝成Task使用線程池運行
  2. 將客戶端發(fā)送的碼流反序列化,反射調(diào)用服務(wù)實現(xiàn)者,獲取執(zhí)行結(jié)果
  3. 將返回結(jié)果反序列化,發(fā)送給客戶端
  4. 調(diào)用結(jié)束后釋放資源
package com.demo.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class RpcRegister {

    static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public static void register(String host, int port) throws Exception {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(host, port));

        try{
            while (true) {
                executor.execute(new RegisterTask(serverSocket.accept()));
            }
        } finally {
            serverSocket.close();
        }
    }

    private static class RegisterTask implements Runnable {

        Socket socket = null;

        public RegisterTask(Socket client) {
            this.socket = client;
        }

        @Override
        public void run() {
            ObjectInputStream inputStream = null;
            ObjectOutputStream outputStream = null;
            try {
                inputStream = new ObjectInputStream(socket.getInputStream());
                // 第一個字符串是接口名
                String interfaceName = inputStream.readUTF();
                // 類加載
                Class service = Class.forName(interfaceName);
                // 第二個字符串是方法名
                String methodName = inputStream.readUTF();
                // 第三個對象是參數(shù)類型
                Class[] parameterTypes = (Class[])inputStream.readObject();
                // 第四個對象是參數(shù)值
                Object[] arguments = (Object[])inputStream.readObject();
                // 使用反射機制,調(diào)用業(yè)務(wù)邏輯
                Method method = service.getMethod(methodName, parameterTypes);
                Object result = method.invoke(service.newInstance(), arguments);
                // 返回數(shù)據(jù)
                outputStream = new ObjectOutputStream(socket.getOutputStream());
                outputStream.writeObject(result);

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (outputStream != null) {
                    try {
                        outputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

本地服務(wù)代理

  1. 將本地的接口調(diào)用轉(zhuǎn)換成JDK的動態(tài)代理,在動態(tài)代理中實現(xiàn)接口的遠程調(diào)用
  2. 創(chuàng)建Socket客戶端,根據(jù)指定地址連接遠程服務(wù)提供者
  3. 將遠程服務(wù)調(diào)用所需的接口類、方法名、參數(shù)類型、參數(shù)值等序列化后發(fā)送到服務(wù)端
  4. 同步阻塞等待響應(yīng)信息,獲取應(yīng)答信息后返回
package com.demo.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

public class RpcClient {
    public T call(final Class serviceClass, final InetSocketAddress addr) {
        return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),
                new Class[]{serviceClass.getInterfaces()[0]},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream outputStream = null;
                        ObjectInputStream inputStream = null;
                        try {
                            socket = new Socket();
                            socket.connect(addr);
                            outputStream = new ObjectOutputStream(socket.getOutputStream());
                            // 設(shè)置請求代理的類名稱
                            outputStream.writeUTF(serviceClass.getName());
                            // 設(shè)置請求方法名
                            outputStream.writeUTF(method.getName());
                            // 設(shè)置請求方法參數(shù)類型
                            outputStream.writeObject(method.getParameterTypes());
                            // 設(shè)置請求方法參數(shù)
                            outputStream.writeObject(args);
                            inputStream = new ObjectInputStream(socket.getInputStream());
                            return inputStream.readObject();
                        } finally {
                            if (socket != null) {
                                socket.close();
                            }
                            if (outputStream != null) {
                                outputStream.close();
                            }
                            if (inputStream != null) {
                                inputStream.close();
                            }
                        }
                    }
                });
    }
}

服務(wù)端模擬

package com.demo.rpc;

public class ServerStart {
    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    RpcRegister.register("127.0.0.1", 8088);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

客戶端模擬

package com.demo.rpc;

import com.demo.rpc.service.HiService;
import com.demo.rpc.service.impl.HiServiceImpl;

import java.net.InetSocketAddress;

public class ClientStart {
    public static void main(String[] args){
        RpcClient rpcClient = new RpcClient<>();

        HiService hiService = rpcClient.call(HiServiceImpl.class, new InetSocketAddress("127.0.0.1", 8088));

        System.out.println(hiService.sayHi("China Qingdao"));
    }
}

客戶端輸出日志

hi, China Qingdao

此示例展示了一個完整的rpc調(diào)用流程,之前使用dubbo的時候,生產(chǎn)者只需暴露接口及相關(guān)序列化,且注冊到zookeeper即可,屏蔽了底層細節(jié)。


總結(jié)


通過梳理RPC相關(guān)知識點,加深了對動態(tài)代理、序列化和協(xié)議編碼的理解,熟悉了rpc的調(diào)用流程,為后邊深入研究一下Dubbo打下了良好的基礎(chǔ)。




本文僅代表作者觀點,版權(quán)歸原創(chuàng)者所有,如需轉(zhuǎn)載請在文中注明來源及作者名字。

免責(zé)聲明:本文系轉(zhuǎn)載編輯文章,僅作分享之用。如分享內(nèi)容、圖片侵犯到您的版權(quán)或非授權(quán)發(fā)布,請及時與我們聯(lián)系進行審核處理或刪除,您可以發(fā)送材料至郵箱:service@tojoy.com