一、为什么选择 brpc?

1.1 使用背景

在现代互联网架构中,微服务已经成为主流架构模式。当系统被拆分成多个服务后,服务间的高效通信变得至关重要。传统的 HTTP REST API 虽然简单,但在高并发、低延迟场景下性能不足。这时候就需要一个高性能的 RPC 框架。

brpc(better RPC)是百度开源并贡献给 Apache 基金会的工业级 RPC 框架,广泛应用于百度内部的搜索、存储、机器学习、广告、推荐等高性能系统。

1.2 核心优势

性能优势

  • 极高的吞吐量:单连接可达 800MB/s 吞吐量,多连接模式下可达 2.3GB/s
  • 超低延迟:最小化锁竞争,即使在 50万+ QPS 下也几乎看不到锁竞争
  • 优于 gRPC:同等条件下,brpc 的 QPS 比 gRPC 高约 10000,且在大包(>8KB)场景下性能优势更明显

架构优势

  • 无 IO/业务线程分离:brpc 智能地将 IO 和业务处理组合,实现更好的并发和效率
  • 动态线程管理:根据负载自动调整线程数,每个请求创建一个 bthread,处理完自动结束
  • 完全并行化:不同客户端的请求读取和解析完全并行化

易用性优势

  • 协议自动识别:服务器自动检测支持的协议,一个端口可接受多种协议
  • 丰富的协议支持:支持 HTTP/HTTPS、H2/gRPC、Redis、Memcached 等多种协议
  • 完善的调试工具:内置强大的性能分析和调试工具

二、快速入门

2.1 环境准备

Ubuntu/Debian 系统

1
2
3
4
5
6
7
8
# 安装基础依赖
sudo apt-get install -y git g++ make libssl-dev libgflags-dev \
    libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev

# 安装测试框架(可选)
sudo apt-get install -y cmake libgtest-dev
cd /usr/src/gtest && sudo cmake . && sudo make
sudo mv lib/libgtest* /usr/lib/ && cd -

CentOS 系统

1
2
3
4
5
6
# 安装 EPEL
sudo yum install epel-release

# 安装依赖
sudo yum install git gcc-c++ make openssl-devel gflags-devel \
    protobuf-devel protobuf-compiler leveldb-devel

2.2 编译安装 brpc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 克隆代码
git clone https://github.com/apache/brpc.git
cd brpc

# 编译
sh config_brpc.sh --headers=/usr/include --libs=/usr/lib
make -j8

# 运行示例
cd example/echo_c++
make
./echo_server &
./echo_client

2.3 第一个 brpc 程序

Step 1: 定义服务接口(echo.proto)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
syntax = "proto2";
package example;

option cc_generic_services = true;

message EchoRequest {
    required string message = 1;
}

message EchoResponse {
    required string message = 1;
}

service EchoService {
    rpc Echo(EchoRequest) returns (EchoResponse);
}

Step 2: 生成代码

1
protoc --cpp_out=. echo.proto

Step 3: 实现服务端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <brpc/server.h>
#include "echo.pb.h"

// 实现服务
class EchoServiceImpl : public example::EchoService {
public:
    void Echo(google::protobuf::RpcController* cntl_base,
              const example::EchoRequest* request,
              example::EchoResponse* response,
              google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);

        // 业务逻辑
        response->set_message(request->message());

        LOG(INFO) << "Received: " << request->message();
    }
};

int main(int argc, char* argv[]) {
    // 创建服务器
    brpc::Server server;

    // 添加服务
    EchoServiceImpl echo_service;
    if (server.AddService(&echo_service,
                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
        LOG(ERROR) << "Fail to add service";
        return -1;
    }

    // 启动服务器
    brpc::ServerOptions options;
    options.num_threads = 8;  // 工作线程数
    if (server.Start(8000, &options) != 0) {
        LOG(ERROR) << "Fail to start server";
        return -1;
    }

    // 等待退出
    server.RunUntilAskedToQuit();
    return 0;
}

Step 4: 实现客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <brpc/channel.h>
#include "echo.pb.h"

int main(int argc, char* argv[]) {
    // 创建 Channel
    brpc::Channel channel;
    brpc::ChannelOptions options;
    options.protocol = brpc::PROTOCOL_BAIDU_STD;
    options.timeout_ms = 100;
    options.max_retry = 3;

    if (channel.Init("127.0.0.1:8000", &options) != 0) {
        LOG(ERROR) << "Fail to initialize channel";
        return -1;
    }

    // 创建服务 stub
    example::EchoService_Stub stub(&channel);

    // 发送请求
    example::EchoRequest request;
    example::EchoResponse response;
    brpc::Controller cntl;

    request.set_message("Hello brpc!");
    stub.Echo(&cntl, &request, &response, NULL);

    if (!cntl.Failed()) {
        LOG(INFO) << "Response: " << response.message();
    } else {
        LOG(ERROR) << cntl.ErrorText();
    }

    return 0;
}

三、核心概念详解

3.1 bthread(协程)- brpc 的核心

3.1.1 什么是 bthread

bthread 是 brpc 实现的 M:N 线程库,是 brpc 高性能的关键。与传统的 pthread 不同:

  • 轻量级:创建和切换开销极小
  • 高并发:单机可支持百万级 bthread
  • 同步编程:用同步方式写异步代码

3.1.2 基础 bthread 使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <bthread/bthread.h>

// 基础 bthread 创建
void* worker_function(void* arg) {
    LOG(INFO) << "Running in bthread " << bthread_self();

    // 模拟工作
    bthread_usleep(1000);  // 睡眠 1ms,不会阻塞 pthread

    return NULL;
}

int main() {
    bthread_t tid;

    // 创建 bthread
    if (bthread_start_urgent(&tid, NULL, worker_function, NULL) != 0) {
        LOG(ERROR) << "Fail to create bthread";
        return -1;
    }

    // 等待 bthread 结束
    bthread_join(tid, NULL);
    return 0;
}

3.1.3 bthread 批量创建示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <vector>
#include <atomic>

std::atomic<int> counter{0};

void* count_task(void* arg) {
    int* task_id = (int*)arg;

    // 模拟计算任务
    for (int i = 0; i < 1000; ++i) {
        counter.fetch_add(1);
    }

    LOG(INFO) << "Task " << *task_id << " completed by bthread "
              << bthread_self();
    return NULL;
}

void BatchCreateBthreads() {
    const int THREAD_COUNT = 10000;  // 创建 1万个 bthread
    std::vector<bthread_t> threads(THREAD_COUNT);
    std::vector<int> task_ids(THREAD_COUNT);

    auto start = std::chrono::high_resolution_clock::now();

    // 批量创建 bthread
    for (int i = 0; i < THREAD_COUNT; ++i) {
        task_ids[i] = i;
        if (bthread_start_background(&threads[i], NULL,
                                   count_task, &task_ids[i]) != 0) {
            LOG(ERROR) << "Fail to create bthread " << i;
        }
    }

    // 等待所有 bthread 完成
    for (int i = 0; i < THREAD_COUNT; ++i) {
        bthread_join(threads[i], NULL);
    }

    auto end = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
        end - start).count();

    LOG(INFO) << "Created " << THREAD_COUNT << " bthreads in "
              << duration << "ms, counter=" << counter.load();
}

3.1.4 bthread 同步原语

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <bthread/mutex.h>
#include <bthread/condition_variable.h>

// bthread 专用的同步原语
class BthreadSafeQueue {
private:
    std::queue<int> queue_;
    bthread::Mutex mutex_;
    bthread::ConditionVariable cv_;
    bool stopped_ = false;

public:
    void Push(int value) {
        std::unique_lock<bthread::Mutex> lock(mutex_);
        queue_.push(value);
        cv_.notify_one();
    }

    bool Pop(int* value) {
        std::unique_lock<bthread::Mutex> lock(mutex_);

        // 等待有数据或停止信号
        cv_.wait(lock, [this] {
            return !queue_.empty() || stopped_;
        });

        if (stopped_ && queue_.empty()) {
            return false;
        }

        *value = queue_.front();
        queue_.pop();
        return true;
    }

    void Stop() {
        std::unique_lock<bthread::Mutex> lock(mutex_);
        stopped_ = true;
        cv_.notify_all();
    }
};

// 生产者-消费者示例
void* producer(void* arg) {
    BthreadSafeQueue* queue = (BthreadSafeQueue*)arg;

    for (int i = 0; i < 100; ++i) {
        queue->Push(i);
        bthread_usleep(100);  // 模拟生产耗时
        LOG(INFO) << "Produced: " << i;
    }

    return NULL;
}

void* consumer(void* arg) {
    BthreadSafeQueue* queue = (BthreadSafeQueue*)arg;
    int value;

    while (queue->Pop(&value)) {
        LOG(INFO) << "Consumed: " << value;
        bthread_usleep(200);  // 模拟消费耗时
    }

    return NULL;
}

3.1.5 bthread 本地存储

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <bthread/key.h>

// bthread 本地存储(类似 thread_local)
bthread_key_t g_key;

void InitBthreadLocal() {
    // 创建 bthread 本地存储键
    if (bthread_key_create(&g_key, NULL) != 0) {
        LOG(ERROR) << "Fail to create bthread key";
    }
}

void* worker_with_local_storage(void* arg) {
    int* worker_id = (int*)arg;

    // 设置 bthread 本地数据
    bthread_setspecific(g_key, worker_id);

    // 模拟多层函数调用
    ProcessData();

    return NULL;
}

void ProcessData() {
    // 在任何函数中都可以获取 bthread 本地数据
    int* worker_id = (int*)bthread_getspecific(g_key);
    LOG(INFO) << "Processing data in worker " << *worker_id;

    // 继续调用其他函数
    CallSubFunction();
}

void CallSubFunction() {
    int* worker_id = (int*)bthread_getspecific(g_key);
    LOG(INFO) << "SubFunction called by worker " << *worker_id;
}

3.1.6 bthread 定时器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <bthread/timer.h>

class BthreadTimer {
private:
    bthread_timer_t timer_id_;
    std::atomic<bool> running_{false};

public:
    // 定时器回调函数
    static void TimerCallback(void* arg) {
        BthreadTimer* timer = (BthreadTimer*)arg;
        LOG(INFO) << "Timer triggered at " << time(NULL);

        // 可以在这里执行任何异步任务
        timer->ProcessPeriodicTask();
    }

    void Start(int interval_ms) {
        if (running_.exchange(true)) {
            return;  // 已经在运行
        }

        // 创建周期性定时器
        if (bthread_timer_add(&timer_id_,
                             butil::milliseconds_from_now(interval_ms),
                             TimerCallback, this) != 0) {
            LOG(ERROR) << "Fail to add timer";
            running_ = false;
        }
    }

    void Stop() {
        if (!running_.exchange(false)) {
            return;  // 已经停止
        }

        bthread_timer_del(timer_id_);
    }

private:
    void ProcessPeriodicTask() {
        // 执行周期性任务
        LOG(INFO) << "Executing periodic task in bthread " << bthread_self();

        // 如果还在运行,则安排下次执行
        if (running_) {
            bthread_timer_add(&timer_id_,
                             butil::milliseconds_from_now(1000),
                             TimerCallback, this);
        }
    }
};

3.1.7 bthread 执行队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <bthread/execution_queue.h>

// 高性能的单线程执行队列
class TaskProcessor {
private:
    bthread::ExecutionQueueId<int> queue_id_;

public:
    // 任务处理函数
    static int ProcessTask(void* meta, bthread::TaskIterator<int>& iter) {
        TaskProcessor* processor = (TaskProcessor*)meta;

        // 批量处理任务
        for (; iter; ++iter) {
            processor->HandleSingleTask(*iter);
        }

        return 0;  // 返回0表示继续处理
    }

    bool Init() {
        bthread::ExecutionQueueOptions options;

        if (bthread::execution_queue_start(&queue_id_, &options,
                                         ProcessTask, this) != 0) {
            LOG(ERROR) << "Fail to start execution queue";
            return false;
        }

        return true;
    }

    void SubmitTask(int task_data) {
        // 提交任务到队列(线程安全)
        if (bthread::execution_queue_execute(queue_id_, task_data) != 0) {
            LOG(ERROR) << "Fail to submit task";
        }
    }

    void Shutdown() {
        bthread::execution_queue_stop(queue_id_);
        bthread::execution_queue_join(queue_id_);
    }

private:
    void HandleSingleTask(int task_data) {
        LOG(INFO) << "Processing task: " << task_data
                  << " in bthread " << bthread_self();

        // 模拟任务处理
        bthread_usleep(100);
    }
};

3.1.8 bthread 与 pthread 交互

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// bthread 调用阻塞的 pthread 操作
class BthreadPthreadBridge {
public:
    // 在 bthread 中调用可能阻塞的操作
    static void CallBlockingOperation() {
        // 方法1:直接调用(会阻塞当前 pthread worker)
        // sleep(1);  // 不推荐,会阻塞整个 worker 线程

        // 方法2:使用 bthread_usleep(推荐)
        bthread_usleep(1000000);  // 睡眠1秒,只阻塞当前 bthread

        // 方法3:对于无法替换的阻塞调用,移到 pthread 执行
        std::future<int> result = std::async(std::launch::async, []() {
            sleep(1);  // 在独立的 pthread 中执行
            return 42;
        });

        // 在 bthread 中等待结果
        int value = result.get();
        LOG(INFO) << "Got result: " << value;
    }

    // pthread 中启动 bthread
    static void PthreadStartBthread() {
        std::thread([&] {
            LOG(INFO) << "In pthread: " << std::this_thread::get_id();

            // 在 pthread 中启动 bthread
            bthread_t tid;
            bthread_start_urgent(&tid, NULL, [](void*) -> void* {
                LOG(INFO) << "In bthread: " << bthread_self();
                return NULL;
            }, NULL);

            bthread_join(tid, NULL);
        }).join();
    }
};

3.1.9 bthread 性能测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include <chrono>

class BthreadPerformanceTest {
public:
    // 测试 bthread vs pthread 创建性能
    static void CompareCreationPerformance() {
        const int THREAD_COUNT = 10000;

        // 测试 bthread 创建性能
        auto start = std::chrono::high_resolution_clock::now();

        std::vector<bthread_t> bthreads(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; ++i) {
            bthread_start_background(&bthreads[i], NULL, [](void*) -> void* {
                bthread_usleep(1000);  // 1ms
                return NULL;
            }, NULL);
        }

        for (int i = 0; i < THREAD_COUNT; ++i) {
            bthread_join(bthreads[i], NULL);
        }

        auto bthread_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::high_resolution_clock::now() - start).count();

        // 测试 pthread 创建性能
        start = std::chrono::high_resolution_clock::now();

        std::vector<std::thread> pthreads;
        pthreads.reserve(THREAD_COUNT);

        for (int i = 0; i < THREAD_COUNT; ++i) {
            pthreads.emplace_back([]() {
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            });
        }

        for (auto& t : pthreads) {
            t.join();
        }

        auto pthread_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::high_resolution_clock::now() - start).count();

        LOG(INFO) << "bthread creation: " << bthread_duration << "ms";
        LOG(INFO) << "pthread creation: " << pthread_duration << "ms";
        LOG(INFO) << "bthread is " << (pthread_duration / bthread_duration)
                  << "x faster";
    }

    // 测试 bthread 上下文切换性能
    static void TestContextSwitch() {
        const int SWITCH_COUNT = 1000000;
        bthread::Mutex mutex;

        auto start = std::chrono::high_resolution_clock::now();

        bthread_t tid1, tid2;

        bthread_start_urgent(&tid1, NULL, [](void* arg) -> void* {
            bthread::Mutex* m = (bthread::Mutex*)arg;
            for (int i = 0; i < SWITCH_COUNT / 2; ++i) {
                std::unique_lock<bthread::Mutex> lock(*m);
                bthread_yield();  // 主动让出CPU
            }
            return NULL;
        }, &mutex);

        bthread_start_urgent(&tid2, NULL, [](void* arg) -> void* {
            bthread::Mutex* m = (bthread::Mutex*)arg;
            for (int i = 0; i < SWITCH_COUNT / 2; ++i) {
                std::unique_lock<bthread::Mutex> lock(*m);
                bthread_yield();  // 主动让出CPU
            }
            return NULL;
        }, &mutex);

        bthread_join(tid1, NULL);
        bthread_join(tid2, NULL);

        auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::high_resolution_clock::now() - start).count();

        LOG(INFO) << "Context switches: " << SWITCH_COUNT;
        LOG(INFO) << "Total time: " << duration << "us";
        LOG(INFO) << "Avg per switch: " << (duration / SWITCH_COUNT) << "us";
    }
};

3.1.10 在 RPC 服务中使用 bthread

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 在 RPC 服务中使用 bthread 进行异步处理
class AsyncRpcService : public example::EchoService {
private:
    TaskProcessor task_processor_;

public:
    void Echo(google::protobuf::RpcController* cntl_base,
              const example::EchoRequest* request,
              example::EchoResponse* response,
              google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);

        // 当前已经在 bthread 中了
        LOG(INFO) << "Request received in bthread " << bthread_self();

        // 方法1:启动新的 bthread 处理耗时操作
        bthread_t worker_tid;
        struct TaskContext {
            std::string message;
            example::EchoResponse* response;
            google::protobuf::Closure* done;
        };

        TaskContext* ctx = new TaskContext{
            request->message(), response, done
        };

        if (bthread_start_background(&worker_tid, NULL,
                                   ProcessInBackground, ctx) == 0) {
            done_guard.release();  // 由 background bthread 负责调用 done
        }

        // 方法2:使用 execution_queue 异步处理
        // task_processor_.SubmitTask(request->message());
    }

private:
    static void* ProcessInBackground(void* arg) {
        std::unique_ptr<TaskContext> ctx((TaskContext*)arg);

        LOG(INFO) << "Processing in background bthread " << bthread_self();

        // 模拟耗时操作
        bthread_usleep(100000);  // 100ms

        // 模拟访问数据库或其他服务
        std::string result = "Processed: " + ctx->message;
        ctx->response->set_message(result);

        // 完成 RPC
        ctx->done->Run();

        return NULL;
    }
};

3.2 Server(服务器)

Server 是 brpc 的服务端核心组件,特点:

  • 自动协议识别:无需指定协议,自动识别客户端协议
  • 多协议支持:一个端口可同时支持多种协议
  • 线程模型:使用 bthread(协程)实现高并发
1
2
3
4
5
6
// 服务器配置示例
brpc::ServerOptions options;
options.num_threads = 16;           // 工作线程数
options.max_concurrency = 0;        // 最大并发请求数,0表示不限制
options.idle_timeout_sec = -1;      // 连接空闲超时,-1表示不超时
options.internal_port = -1;         // 内部监控端口

3.2 Channel(通道)

Channel 代表到服务器的通信线路,特点:

  • 线程安全:可被所有线程共享
  • 连接池:自动管理底层连接
  • 负载均衡:支持多种负载均衡算法
1
2
3
4
5
6
7
// Channel 配置示例
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;  // 协议类型
options.connection_type = brpc::CONNECTION_TYPE_POOLED;  // 连接池
options.timeout_ms = 100;                      // 超时时间
options.max_retry = 3;                         // 最大重试次数
options.backup_request_ms = -1;                // backup request

3.3 Controller

Controller 用于控制单次 RPC 调用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
brpc::Controller cntl;

// 设置请求参数
cntl.set_timeout_ms(200);
cntl.set_max_retry(2);
cntl.set_log_id(12345);  // 用于追踪

// 发起调用
stub.Method(&cntl, &request, &response, NULL);

// 检查结果
if (cntl.Failed()) {
    LOG(ERROR) << "Error: " << cntl.ErrorText();
    // 获取详细错误信息
    int error_code = cntl.ErrorCode();
    // 获取响应延迟
    int64_t latency = cntl.latency_us();
}

四、进阶特性

4.1 异步调用

异步客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 使用 Callback
void HandleResponse(brpc::Controller* cntl,
                   example::EchoResponse* response) {
    if (!cntl->Failed()) {
        LOG(INFO) << "Got response: " << response->message();
    }
    delete cntl;
    delete response;
}

// 发起异步调用
brpc::Controller* cntl = new brpc::Controller();
example::EchoResponse* response = new example::EchoResponse();
google::protobuf::Closure* done = brpc::NewCallback(
    &HandleResponse, cntl, response);

stub.Echo(cntl, &request, response, done);

异步服务器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class AsyncEchoService : public example::EchoService {
public:
    void Echo(google::protobuf::RpcController* cntl_base,
              const example::EchoRequest* request,
              example::EchoResponse* response,
              google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);

        // 启动异步任务
        bthread_t tid;
        bthread_start_background(&tid, NULL, ProcessRequest,
                                new Context(request, response, done));

        // 释放 done_guard,让异步任务负责调用 done
        done_guard.release();
    }
};

4.2 流式 RPC

brpc 支持流式 RPC,适用于大数据传输:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 创建 Stream
brpc::StreamId stream_id;
brpc::StreamOptions stream_options;
if (brpc::StreamCreate(&stream_id, cntl, &stream_options) != 0) {
    LOG(ERROR) << "Fail to create stream";
    return;
}

// 写入数据
butil::IOBuf data;
data.append("streaming data");
brpc::StreamWrite(stream_id, data);

// 关闭 Stream
brpc::StreamClose(stream_id);

4.3 负载均衡

brpc 支持多种负载均衡算法:

1
2
3
4
5
6
7
8
9
// 设置负载均衡算法
options.load_balancer_name = "rr";        // round-robin
// options.load_balancer_name = "random";  // 随机
// options.load_balancer_name = "la";      // locality-aware
// options.load_balancer_name = "c_murmurhash";  // 一致性哈希

// 添加多个服务器节点
channel.Init("list://server1:8000,server2:8000,server3:8000",
             &options);

4.4 服务发现

集成服务发现系统:

1
2
3
4
5
6
7
8
// 使用 Naming Service
channel.Init("bns://service_name", &options);

// 或使用 Consul
channel.Init("consul://service_name", &options);

// 或使用文件
channel.Init("file://server_list.txt", &options);

五、性能优化

5.1 连接池优化

1
2
3
4
5
6
7
8
// 短连接(适合请求频率低的场景)
options.connection_type = brpc::CONNECTION_TYPE_SHORT;

// 连接池(推荐,适合大部分场景)
options.connection_type = brpc::CONNECTION_TYPE_POOLED;

// 单连接(适合流式或推送场景)
options.connection_type = brpc::CONNECTION_TYPE_SINGLE;

5.2 并发控制

1
2
3
4
5
6
7
8
// 服务器端并发控制
brpc::ServerOptions server_options;
server_options.max_concurrency = 1000;  // 限制最大并发

// 客户端并发控制
brpc::ChannelOptions channel_options;
channel_options.max_retry = 3;
channel_options.backup_request_ms = 100;  // 100ms 后发送 backup request

5.3 批量请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// 使用 ParallelChannel 进行批量请求
brpc::ParallelChannel pchan;
pchan.Init(&options);

// 添加多个子 Channel
for (int i = 0; i < server_count; ++i) {
    brpc::Channel* sub_channel = new brpc::Channel;
    sub_channel->Init(server_addr[i], &options);
    pchan.AddChannel(sub_channel, brpc::OWNS_CHANNEL);
}

// 发送请求会自动分发到所有子 Channel

5.4 性能监控

1
2
3
4
5
6
7
8
// 启用内置服务
server.Start(8000, &options);

// 访问内置监控页面
// http://localhost:8000/status    - 查看服务状态
// http://localhost:8000/vars      - 查看统计变量
// http://localhost:8000/rpcz      - 查看 RPC 详情
// http://localhost:8000/hotspots  - 查看 CPU 热点

六、最佳实践

6.1 错误处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 完善的错误处理
void CallMethod() {
    brpc::Controller cntl;

    // 设置重试策略
    cntl.set_max_retry(3);

    // 调用
    stub.Method(&cntl, &request, &response, NULL);

    if (cntl.Failed()) {
        // 区分错误类型
        if (cntl.ErrorCode() == brpc::ENOSERVICE) {
            LOG(ERROR) << "Service not found";
        } else if (cntl.ErrorCode() == brpc::ENOMETHOD) {
            LOG(ERROR) << "Method not found";
        } else if (cntl.ErrorCode() == brpc::ETIMEOUT) {
            LOG(ERROR) << "Request timeout";
        } else {
            LOG(ERROR) << "Unknown error: " << cntl.ErrorText();
        }
    }
}

6.2 超时设置

1
2
3
4
5
6
7
8
9
// 分层超时设置
// 1. Channel 级别(默认超时)
channel_options.timeout_ms = 1000;

// 2. Controller 级别(单次请求超时)
cntl.set_timeout_ms(500);

// 3. Backup Request(用于长尾优化)
channel_options.backup_request_ms = 200;

6.3 日志和追踪

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 设置 log_id 用于全链路追踪
cntl.set_log_id(GenerateLogId());

// 自定义附件
cntl.request_attachment().append("custom_data");

// 获取详细时间统计
LOG(INFO) << "Latency: " << cntl.latency_us() << "us"
          << " Send: " << cntl.send_latency_us() << "us"
          << " Receive: " << cntl.receive_latency_us() << "us";

6.4 安全通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 启用 SSL/TLS
brpc::ChannelOptions options;
options.mutable_ssl_options()->client_cert = "client.crt";
options.mutable_ssl_options()->client_key = "client.key";
options.mutable_ssl_options()->ca_cert = "ca.crt";

// 服务器端 SSL
brpc::ServerOptions server_options;
server_options.mutable_ssl_options()->cert = "server.crt";
server_options.mutable_ssl_options()->key = "server.key";

七、实战案例

7.1 实现限流中间件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class RateLimiter : public brpc::Interceptor {
private:
    std::atomic<int> qps_count{0};
    int max_qps;

public:
    RateLimiter(int max_qps) : max_qps(max_qps) {}

    void BeforeRpc(brpc::Controller* cntl) override {
        if (qps_count.fetch_add(1) > max_qps) {
            cntl->SetFailed(brpc::ELIMIT, "Rate limit exceeded");
            cntl->SendResponse();
        }
    }

    void AfterRpc(brpc::Controller* cntl) override {
        qps_count.fetch_sub(1);
    }
};

// 使用
server.AddInterceptor(new RateLimiter(10000));

7.2 实现熔断器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class CircuitBreaker {
private:
    std::atomic<int> failure_count{0};
    std::atomic<bool> is_open{false};
    int threshold = 10;

public:
    bool ShouldPass() {
        if (is_open.load()) {
            return false;  // 熔断状态,拒绝请求
        }
        return true;
    }

    void OnSuccess() {
        failure_count.store(0);
        is_open.store(false);
    }

    void OnFailure() {
        if (failure_count.fetch_add(1) >= threshold) {
            is_open.store(true);  // 触发熔断
            // 启动定时器,一段时间后尝试恢复
        }
    }
};

7.3 实现请求缓存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class CachedService : public example::EchoService {
private:
    std::unordered_map<std::string, std::string> cache;
    std::mutex cache_mutex;

public:
    void Echo(google::protobuf::RpcController* cntl_base,
              const example::EchoRequest* request,
              example::EchoResponse* response,
              google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);

        // 查询缓存
        {
            std::lock_guard<std::mutex> lock(cache_mutex);
            auto it = cache.find(request->message());
            if (it != cache.end()) {
                response->set_message(it->second);
                return;
            }
        }

        // 处理请求
        std::string result = ProcessRequest(request->message());
        response->set_message(result);

        // 更新缓存
        {
            std::lock_guard<std::mutex> lock(cache_mutex);
            cache[request->message()] = result;
        }
    }
};

八、故障排查

8.1 常见问题

连接问题

1
2
3
4
5
6
7
// 检查连接状态
if (channel.CheckHealth() != 0) {
    LOG(ERROR) << "Channel is unhealthy";
}

// 获取连接统计
LOG(INFO) << "Connection count: " << channel.ConnectionCount();

性能问题

1
2
3
4
5
6
7
8
9
// 使用内置性能分析工具
// 1. CPU profiling
//    访问 http://server:port/hotspots

// 2. 内存分析
//    访问 http://server:port/heap

// 3. RPC 追踪
//    访问 http://server:port/rpcz

8.2 调试技巧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 开启详细日志
FLAGS_v = 4;  // 设置日志级别

// 打印请求响应
class DebugInterceptor : public brpc::Interceptor {
    void BeforeRpc(brpc::Controller* cntl) override {
        LOG(INFO) << "Request: " << cntl->request_attachment();
    }

    void AfterRpc(brpc::Controller* cntl) override {
        LOG(INFO) << "Response: " << cntl->response_attachment();
    }
};

九、与其他框架对比

9.1 brpc vs gRPC

特性brpcgRPC
性能极高(QPS 高 10000+)
协议支持丰富(HTTP/H2/Redis/MC等)主要是 HTTP/2
负载均衡内置多种算法需要额外组件
服务发现内置支持需要额外组件
调试工具丰富的内置工具相对较少
学习曲线适中较陡
社区Apache 社区Google 主导

9.2 选择建议

  • 选择 brpc

    • 需要极致性能(高并发、低延迟)
    • 需要支持多种协议
    • 在百度云或类似环境
    • 需要丰富的调试工具
  • 选择 gRPC

    • 跨语言支持更重要
    • 与 Google 生态集成
    • 团队已有 gRPC 经验

十、学习资源

10.1 官方资源

10.2 示例代码

  • example/echo_c++/ - Echo 服务示例
  • example/multi_threaded_echo_c++/ - 多线程示例
  • example/streaming_echo_c++/ - 流式 RPC 示例
  • example/parallel_echo_c++/ - 并行请求示例

10.3 进阶学习路径

  1. 基础阶段(1-2周)

    • 理解 RPC 基本概念
    • 完成 Echo 示例
    • 掌握基本的 Server/Channel 使用
  2. 进阶阶段(2-4周)

    • 学习异步编程模型
    • 掌握流式 RPC
    • 理解负载均衡和服务发现
  3. 高级阶段(1-2月)

    • 源码阅读(bthread 实现)
    • 性能调优实践
    • 自定义协议开发
  4. 专家阶段

    • 参与开源贡献
    • 大规模分布式系统实践
    • 框架二次开发

十一、总结

brpc 作为百度开源的工业级 RPC 框架,在性能、易用性和功能完整性方面都达到了业界领先水平。其优势不仅体现在极致的性能优化上,更在于其完善的生态和丰富的功能特性。

对于需要构建高性能分布式系统的团队,brpc 是一个值得认真考虑的选择。通过本指南的学习,相信你已经对 brpc 有了全面的认识,能够在实际项目中灵活运用。

记住,掌握一个框架的关键在于:

  1. 理解原理:知其然更要知其所以然
  2. 动手实践:从简单示例开始,逐步深入
  3. 阅读源码:深入理解实现细节
  4. 解决问题:在实践中积累经验

祝你在 brpc 的学习之路上越走越远!