一、为什么选择 brpc?#
1.1 使用背景#
在现代互联网架构中,微服务已经成为主流架构模式。当系统被拆分成多个服务后,服务间的高效通信变得至关重要。传统的 HTTP REST API 虽然简单,但在高并发、低延迟场景下性能不足。这时候就需要一个高性能的 RPC 框架。
brpc(better RPC)是百度开源并贡献给 Apache 基金会的工业级 RPC 框架,广泛应用于百度内部的搜索、存储、机器学习、广告、推荐等高性能系统。
1.2 核心优势#
性能优势#
- 极高的吞吐量:单连接可达 800MB/s 吞吐量,多连接模式下可达 2.3GB/s
- 超低延迟:最小化锁竞争,即使在 50万+ QPS 下也几乎看不到锁竞争
- 优于 gRPC:同等条件下,brpc 的 QPS 比 gRPC 高约 10000,且在大包(>8KB)场景下性能优势更明显
架构优势#
- 无 IO/业务线程分离:brpc 智能地将 IO 和业务处理组合,实现更好的并发和效率
- 动态线程管理:根据负载自动调整线程数,每个请求创建一个 bthread,处理完自动结束
- 完全并行化:不同客户端的请求读取和解析完全并行化
易用性优势#
- 协议自动识别:服务器自动检测支持的协议,一个端口可接受多种协议
- 丰富的协议支持:支持 HTTP/HTTPS、H2/gRPC、Redis、Memcached 等多种协议
- 完善的调试工具:内置强大的性能分析和调试工具
二、快速入门#
2.1 环境准备#
Ubuntu/Debian 系统#
1
2
3
4
5
6
7
8
| # 安装基础依赖
sudo apt-get install -y git g++ make libssl-dev libgflags-dev \
libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
# 安装测试框架(可选)
sudo apt-get install -y cmake libgtest-dev
cd /usr/src/gtest && sudo cmake . && sudo make
sudo mv lib/libgtest* /usr/lib/ && cd -
|
CentOS 系统#
1
2
3
4
5
6
| # 安装 EPEL
sudo yum install epel-release
# 安装依赖
sudo yum install git gcc-c++ make openssl-devel gflags-devel \
protobuf-devel protobuf-compiler leveldb-devel
|
2.2 编译安装 brpc#
1
2
3
4
5
6
7
8
9
10
11
12
13
| # 克隆代码
git clone https://github.com/apache/brpc.git
cd brpc
# 编译
sh config_brpc.sh --headers=/usr/include --libs=/usr/lib
make -j8
# 运行示例
cd example/echo_c++
make
./echo_server &
./echo_client
|
2.3 第一个 brpc 程序#
Step 1: 定义服务接口(echo.proto)#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| syntax = "proto2";
package example;
option cc_generic_services = true;
message EchoRequest {
required string message = 1;
}
message EchoResponse {
required string message = 1;
}
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
}
|
Step 2: 生成代码#
1
| protoc --cpp_out=. echo.proto
|
Step 3: 实现服务端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| #include <brpc/server.h>
#include "echo.pb.h"
// 实现服务
class EchoServiceImpl : public example::EchoService {
public:
void Echo(google::protobuf::RpcController* cntl_base,
const example::EchoRequest* request,
example::EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
// 业务逻辑
response->set_message(request->message());
LOG(INFO) << "Received: " << request->message();
}
};
int main(int argc, char* argv[]) {
// 创建服务器
brpc::Server server;
// 添加服务
EchoServiceImpl echo_service;
if (server.AddService(&echo_service,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
// 启动服务器
brpc::ServerOptions options;
options.num_threads = 8; // 工作线程数
if (server.Start(8000, &options) != 0) {
LOG(ERROR) << "Fail to start server";
return -1;
}
// 等待退出
server.RunUntilAskedToQuit();
return 0;
}
|
Step 4: 实现客户端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| #include <brpc/channel.h>
#include "echo.pb.h"
int main(int argc, char* argv[]) {
// 创建 Channel
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;
options.timeout_ms = 100;
options.max_retry = 3;
if (channel.Init("127.0.0.1:8000", &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
// 创建服务 stub
example::EchoService_Stub stub(&channel);
// 发送请求
example::EchoRequest request;
example::EchoResponse response;
brpc::Controller cntl;
request.set_message("Hello brpc!");
stub.Echo(&cntl, &request, &response, NULL);
if (!cntl.Failed()) {
LOG(INFO) << "Response: " << response.message();
} else {
LOG(ERROR) << cntl.ErrorText();
}
return 0;
}
|
三、核心概念详解#
3.1 bthread(协程)- brpc 的核心#
3.1.1 什么是 bthread#
bthread 是 brpc 实现的 M:N 线程库,是 brpc 高性能的关键。与传统的 pthread 不同:
- 轻量级:创建和切换开销极小
- 高并发:单机可支持百万级 bthread
- 同步编程:用同步方式写异步代码
3.1.2 基础 bthread 使用#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| #include <bthread/bthread.h>
// 基础 bthread 创建
void* worker_function(void* arg) {
LOG(INFO) << "Running in bthread " << bthread_self();
// 模拟工作
bthread_usleep(1000); // 睡眠 1ms,不会阻塞 pthread
return NULL;
}
int main() {
bthread_t tid;
// 创建 bthread
if (bthread_start_urgent(&tid, NULL, worker_function, NULL) != 0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
// 等待 bthread 结束
bthread_join(tid, NULL);
return 0;
}
|
3.1.3 bthread 批量创建示例#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| #include <vector>
#include <atomic>
std::atomic<int> counter{0};
void* count_task(void* arg) {
int* task_id = (int*)arg;
// 模拟计算任务
for (int i = 0; i < 1000; ++i) {
counter.fetch_add(1);
}
LOG(INFO) << "Task " << *task_id << " completed by bthread "
<< bthread_self();
return NULL;
}
void BatchCreateBthreads() {
const int THREAD_COUNT = 10000; // 创建 1万个 bthread
std::vector<bthread_t> threads(THREAD_COUNT);
std::vector<int> task_ids(THREAD_COUNT);
auto start = std::chrono::high_resolution_clock::now();
// 批量创建 bthread
for (int i = 0; i < THREAD_COUNT; ++i) {
task_ids[i] = i;
if (bthread_start_background(&threads[i], NULL,
count_task, &task_ids[i]) != 0) {
LOG(ERROR) << "Fail to create bthread " << i;
}
}
// 等待所有 bthread 完成
for (int i = 0; i < THREAD_COUNT; ++i) {
bthread_join(threads[i], NULL);
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end - start).count();
LOG(INFO) << "Created " << THREAD_COUNT << " bthreads in "
<< duration << "ms, counter=" << counter.load();
}
|
3.1.4 bthread 同步原语#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
| #include <bthread/mutex.h>
#include <bthread/condition_variable.h>
// bthread 专用的同步原语
class BthreadSafeQueue {
private:
std::queue<int> queue_;
bthread::Mutex mutex_;
bthread::ConditionVariable cv_;
bool stopped_ = false;
public:
void Push(int value) {
std::unique_lock<bthread::Mutex> lock(mutex_);
queue_.push(value);
cv_.notify_one();
}
bool Pop(int* value) {
std::unique_lock<bthread::Mutex> lock(mutex_);
// 等待有数据或停止信号
cv_.wait(lock, [this] {
return !queue_.empty() || stopped_;
});
if (stopped_ && queue_.empty()) {
return false;
}
*value = queue_.front();
queue_.pop();
return true;
}
void Stop() {
std::unique_lock<bthread::Mutex> lock(mutex_);
stopped_ = true;
cv_.notify_all();
}
};
// 生产者-消费者示例
void* producer(void* arg) {
BthreadSafeQueue* queue = (BthreadSafeQueue*)arg;
for (int i = 0; i < 100; ++i) {
queue->Push(i);
bthread_usleep(100); // 模拟生产耗时
LOG(INFO) << "Produced: " << i;
}
return NULL;
}
void* consumer(void* arg) {
BthreadSafeQueue* queue = (BthreadSafeQueue*)arg;
int value;
while (queue->Pop(&value)) {
LOG(INFO) << "Consumed: " << value;
bthread_usleep(200); // 模拟消费耗时
}
return NULL;
}
|
3.1.5 bthread 本地存储#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| #include <bthread/key.h>
// bthread 本地存储(类似 thread_local)
bthread_key_t g_key;
void InitBthreadLocal() {
// 创建 bthread 本地存储键
if (bthread_key_create(&g_key, NULL) != 0) {
LOG(ERROR) << "Fail to create bthread key";
}
}
void* worker_with_local_storage(void* arg) {
int* worker_id = (int*)arg;
// 设置 bthread 本地数据
bthread_setspecific(g_key, worker_id);
// 模拟多层函数调用
ProcessData();
return NULL;
}
void ProcessData() {
// 在任何函数中都可以获取 bthread 本地数据
int* worker_id = (int*)bthread_getspecific(g_key);
LOG(INFO) << "Processing data in worker " << *worker_id;
// 继续调用其他函数
CallSubFunction();
}
void CallSubFunction() {
int* worker_id = (int*)bthread_getspecific(g_key);
LOG(INFO) << "SubFunction called by worker " << *worker_id;
}
|
3.1.6 bthread 定时器#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| #include <bthread/timer.h>
class BthreadTimer {
private:
bthread_timer_t timer_id_;
std::atomic<bool> running_{false};
public:
// 定时器回调函数
static void TimerCallback(void* arg) {
BthreadTimer* timer = (BthreadTimer*)arg;
LOG(INFO) << "Timer triggered at " << time(NULL);
// 可以在这里执行任何异步任务
timer->ProcessPeriodicTask();
}
void Start(int interval_ms) {
if (running_.exchange(true)) {
return; // 已经在运行
}
// 创建周期性定时器
if (bthread_timer_add(&timer_id_,
butil::milliseconds_from_now(interval_ms),
TimerCallback, this) != 0) {
LOG(ERROR) << "Fail to add timer";
running_ = false;
}
}
void Stop() {
if (!running_.exchange(false)) {
return; // 已经停止
}
bthread_timer_del(timer_id_);
}
private:
void ProcessPeriodicTask() {
// 执行周期性任务
LOG(INFO) << "Executing periodic task in bthread " << bthread_self();
// 如果还在运行,则安排下次执行
if (running_) {
bthread_timer_add(&timer_id_,
butil::milliseconds_from_now(1000),
TimerCallback, this);
}
}
};
|
3.1.7 bthread 执行队列#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| #include <bthread/execution_queue.h>
// 高性能的单线程执行队列
class TaskProcessor {
private:
bthread::ExecutionQueueId<int> queue_id_;
public:
// 任务处理函数
static int ProcessTask(void* meta, bthread::TaskIterator<int>& iter) {
TaskProcessor* processor = (TaskProcessor*)meta;
// 批量处理任务
for (; iter; ++iter) {
processor->HandleSingleTask(*iter);
}
return 0; // 返回0表示继续处理
}
bool Init() {
bthread::ExecutionQueueOptions options;
if (bthread::execution_queue_start(&queue_id_, &options,
ProcessTask, this) != 0) {
LOG(ERROR) << "Fail to start execution queue";
return false;
}
return true;
}
void SubmitTask(int task_data) {
// 提交任务到队列(线程安全)
if (bthread::execution_queue_execute(queue_id_, task_data) != 0) {
LOG(ERROR) << "Fail to submit task";
}
}
void Shutdown() {
bthread::execution_queue_stop(queue_id_);
bthread::execution_queue_join(queue_id_);
}
private:
void HandleSingleTask(int task_data) {
LOG(INFO) << "Processing task: " << task_data
<< " in bthread " << bthread_self();
// 模拟任务处理
bthread_usleep(100);
}
};
|
3.1.8 bthread 与 pthread 交互#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| // bthread 调用阻塞的 pthread 操作
class BthreadPthreadBridge {
public:
// 在 bthread 中调用可能阻塞的操作
static void CallBlockingOperation() {
// 方法1:直接调用(会阻塞当前 pthread worker)
// sleep(1); // 不推荐,会阻塞整个 worker 线程
// 方法2:使用 bthread_usleep(推荐)
bthread_usleep(1000000); // 睡眠1秒,只阻塞当前 bthread
// 方法3:对于无法替换的阻塞调用,移到 pthread 执行
std::future<int> result = std::async(std::launch::async, []() {
sleep(1); // 在独立的 pthread 中执行
return 42;
});
// 在 bthread 中等待结果
int value = result.get();
LOG(INFO) << "Got result: " << value;
}
// pthread 中启动 bthread
static void PthreadStartBthread() {
std::thread([&] {
LOG(INFO) << "In pthread: " << std::this_thread::get_id();
// 在 pthread 中启动 bthread
bthread_t tid;
bthread_start_urgent(&tid, NULL, [](void*) -> void* {
LOG(INFO) << "In bthread: " << bthread_self();
return NULL;
}, NULL);
bthread_join(tid, NULL);
}).join();
}
};
|
3.1.9 bthread 性能测试#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
| #include <chrono>
class BthreadPerformanceTest {
public:
// 测试 bthread vs pthread 创建性能
static void CompareCreationPerformance() {
const int THREAD_COUNT = 10000;
// 测试 bthread 创建性能
auto start = std::chrono::high_resolution_clock::now();
std::vector<bthread_t> bthreads(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; ++i) {
bthread_start_background(&bthreads[i], NULL, [](void*) -> void* {
bthread_usleep(1000); // 1ms
return NULL;
}, NULL);
}
for (int i = 0; i < THREAD_COUNT; ++i) {
bthread_join(bthreads[i], NULL);
}
auto bthread_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start).count();
// 测试 pthread 创建性能
start = std::chrono::high_resolution_clock::now();
std::vector<std::thread> pthreads;
pthreads.reserve(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; ++i) {
pthreads.emplace_back([]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
});
}
for (auto& t : pthreads) {
t.join();
}
auto pthread_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start).count();
LOG(INFO) << "bthread creation: " << bthread_duration << "ms";
LOG(INFO) << "pthread creation: " << pthread_duration << "ms";
LOG(INFO) << "bthread is " << (pthread_duration / bthread_duration)
<< "x faster";
}
// 测试 bthread 上下文切换性能
static void TestContextSwitch() {
const int SWITCH_COUNT = 1000000;
bthread::Mutex mutex;
auto start = std::chrono::high_resolution_clock::now();
bthread_t tid1, tid2;
bthread_start_urgent(&tid1, NULL, [](void* arg) -> void* {
bthread::Mutex* m = (bthread::Mutex*)arg;
for (int i = 0; i < SWITCH_COUNT / 2; ++i) {
std::unique_lock<bthread::Mutex> lock(*m);
bthread_yield(); // 主动让出CPU
}
return NULL;
}, &mutex);
bthread_start_urgent(&tid2, NULL, [](void* arg) -> void* {
bthread::Mutex* m = (bthread::Mutex*)arg;
for (int i = 0; i < SWITCH_COUNT / 2; ++i) {
std::unique_lock<bthread::Mutex> lock(*m);
bthread_yield(); // 主动让出CPU
}
return NULL;
}, &mutex);
bthread_join(tid1, NULL);
bthread_join(tid2, NULL);
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - start).count();
LOG(INFO) << "Context switches: " << SWITCH_COUNT;
LOG(INFO) << "Total time: " << duration << "us";
LOG(INFO) << "Avg per switch: " << (duration / SWITCH_COUNT) << "us";
}
};
|
3.1.10 在 RPC 服务中使用 bthread#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| // 在 RPC 服务中使用 bthread 进行异步处理
class AsyncRpcService : public example::EchoService {
private:
TaskProcessor task_processor_;
public:
void Echo(google::protobuf::RpcController* cntl_base,
const example::EchoRequest* request,
example::EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
// 当前已经在 bthread 中了
LOG(INFO) << "Request received in bthread " << bthread_self();
// 方法1:启动新的 bthread 处理耗时操作
bthread_t worker_tid;
struct TaskContext {
std::string message;
example::EchoResponse* response;
google::protobuf::Closure* done;
};
TaskContext* ctx = new TaskContext{
request->message(), response, done
};
if (bthread_start_background(&worker_tid, NULL,
ProcessInBackground, ctx) == 0) {
done_guard.release(); // 由 background bthread 负责调用 done
}
// 方法2:使用 execution_queue 异步处理
// task_processor_.SubmitTask(request->message());
}
private:
static void* ProcessInBackground(void* arg) {
std::unique_ptr<TaskContext> ctx((TaskContext*)arg);
LOG(INFO) << "Processing in background bthread " << bthread_self();
// 模拟耗时操作
bthread_usleep(100000); // 100ms
// 模拟访问数据库或其他服务
std::string result = "Processed: " + ctx->message;
ctx->response->set_message(result);
// 完成 RPC
ctx->done->Run();
return NULL;
}
};
|
3.2 Server(服务器)#
Server 是 brpc 的服务端核心组件,特点:
- 自动协议识别:无需指定协议,自动识别客户端协议
- 多协议支持:一个端口可同时支持多种协议
- 线程模型:使用 bthread(协程)实现高并发
1
2
3
4
5
6
| // 服务器配置示例
brpc::ServerOptions options;
options.num_threads = 16; // 工作线程数
options.max_concurrency = 0; // 最大并发请求数,0表示不限制
options.idle_timeout_sec = -1; // 连接空闲超时,-1表示不超时
options.internal_port = -1; // 内部监控端口
|
3.2 Channel(通道)#
Channel 代表到服务器的通信线路,特点:
- 线程安全:可被所有线程共享
- 连接池:自动管理底层连接
- 负载均衡:支持多种负载均衡算法
1
2
3
4
5
6
7
| // Channel 配置示例
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD; // 协议类型
options.connection_type = brpc::CONNECTION_TYPE_POOLED; // 连接池
options.timeout_ms = 100; // 超时时间
options.max_retry = 3; // 最大重试次数
options.backup_request_ms = -1; // backup request
|
3.3 Controller#
Controller 用于控制单次 RPC 调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| brpc::Controller cntl;
// 设置请求参数
cntl.set_timeout_ms(200);
cntl.set_max_retry(2);
cntl.set_log_id(12345); // 用于追踪
// 发起调用
stub.Method(&cntl, &request, &response, NULL);
// 检查结果
if (cntl.Failed()) {
LOG(ERROR) << "Error: " << cntl.ErrorText();
// 获取详细错误信息
int error_code = cntl.ErrorCode();
// 获取响应延迟
int64_t latency = cntl.latency_us();
}
|
四、进阶特性#
4.1 异步调用#
异步客户端#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| // 使用 Callback
void HandleResponse(brpc::Controller* cntl,
example::EchoResponse* response) {
if (!cntl->Failed()) {
LOG(INFO) << "Got response: " << response->message();
}
delete cntl;
delete response;
}
// 发起异步调用
brpc::Controller* cntl = new brpc::Controller();
example::EchoResponse* response = new example::EchoResponse();
google::protobuf::Closure* done = brpc::NewCallback(
&HandleResponse, cntl, response);
stub.Echo(cntl, &request, response, done);
|
异步服务器#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class AsyncEchoService : public example::EchoService {
public:
void Echo(google::protobuf::RpcController* cntl_base,
const example::EchoRequest* request,
example::EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
// 启动异步任务
bthread_t tid;
bthread_start_background(&tid, NULL, ProcessRequest,
new Context(request, response, done));
// 释放 done_guard,让异步任务负责调用 done
done_guard.release();
}
};
|
4.2 流式 RPC#
brpc 支持流式 RPC,适用于大数据传输:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // 创建 Stream
brpc::StreamId stream_id;
brpc::StreamOptions stream_options;
if (brpc::StreamCreate(&stream_id, cntl, &stream_options) != 0) {
LOG(ERROR) << "Fail to create stream";
return;
}
// 写入数据
butil::IOBuf data;
data.append("streaming data");
brpc::StreamWrite(stream_id, data);
// 关闭 Stream
brpc::StreamClose(stream_id);
|
4.3 负载均衡#
brpc 支持多种负载均衡算法:
1
2
3
4
5
6
7
8
9
| // 设置负载均衡算法
options.load_balancer_name = "rr"; // round-robin
// options.load_balancer_name = "random"; // 随机
// options.load_balancer_name = "la"; // locality-aware
// options.load_balancer_name = "c_murmurhash"; // 一致性哈希
// 添加多个服务器节点
channel.Init("list://server1:8000,server2:8000,server3:8000",
&options);
|
4.4 服务发现#
集成服务发现系统:
1
2
3
4
5
6
7
8
| // 使用 Naming Service
channel.Init("bns://service_name", &options);
// 或使用 Consul
channel.Init("consul://service_name", &options);
// 或使用文件
channel.Init("file://server_list.txt", &options);
|
五、性能优化#
5.1 连接池优化#
1
2
3
4
5
6
7
8
| // 短连接(适合请求频率低的场景)
options.connection_type = brpc::CONNECTION_TYPE_SHORT;
// 连接池(推荐,适合大部分场景)
options.connection_type = brpc::CONNECTION_TYPE_POOLED;
// 单连接(适合流式或推送场景)
options.connection_type = brpc::CONNECTION_TYPE_SINGLE;
|
5.2 并发控制#
1
2
3
4
5
6
7
8
| // 服务器端并发控制
brpc::ServerOptions server_options;
server_options.max_concurrency = 1000; // 限制最大并发
// 客户端并发控制
brpc::ChannelOptions channel_options;
channel_options.max_retry = 3;
channel_options.backup_request_ms = 100; // 100ms 后发送 backup request
|
5.3 批量请求#
1
2
3
4
5
6
7
8
9
10
11
12
| // 使用 ParallelChannel 进行批量请求
brpc::ParallelChannel pchan;
pchan.Init(&options);
// 添加多个子 Channel
for (int i = 0; i < server_count; ++i) {
brpc::Channel* sub_channel = new brpc::Channel;
sub_channel->Init(server_addr[i], &options);
pchan.AddChannel(sub_channel, brpc::OWNS_CHANNEL);
}
// 发送请求会自动分发到所有子 Channel
|
5.4 性能监控#
1
2
3
4
5
6
7
8
| // 启用内置服务
server.Start(8000, &options);
// 访问内置监控页面
// http://localhost:8000/status - 查看服务状态
// http://localhost:8000/vars - 查看统计变量
// http://localhost:8000/rpcz - 查看 RPC 详情
// http://localhost:8000/hotspots - 查看 CPU 热点
|
六、最佳实践#
6.1 错误处理#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // 完善的错误处理
void CallMethod() {
brpc::Controller cntl;
// 设置重试策略
cntl.set_max_retry(3);
// 调用
stub.Method(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
// 区分错误类型
if (cntl.ErrorCode() == brpc::ENOSERVICE) {
LOG(ERROR) << "Service not found";
} else if (cntl.ErrorCode() == brpc::ENOMETHOD) {
LOG(ERROR) << "Method not found";
} else if (cntl.ErrorCode() == brpc::ETIMEOUT) {
LOG(ERROR) << "Request timeout";
} else {
LOG(ERROR) << "Unknown error: " << cntl.ErrorText();
}
}
}
|
6.2 超时设置#
1
2
3
4
5
6
7
8
9
| // 分层超时设置
// 1. Channel 级别(默认超时)
channel_options.timeout_ms = 1000;
// 2. Controller 级别(单次请求超时)
cntl.set_timeout_ms(500);
// 3. Backup Request(用于长尾优化)
channel_options.backup_request_ms = 200;
|
6.3 日志和追踪#
1
2
3
4
5
6
7
8
9
10
| // 设置 log_id 用于全链路追踪
cntl.set_log_id(GenerateLogId());
// 自定义附件
cntl.request_attachment().append("custom_data");
// 获取详细时间统计
LOG(INFO) << "Latency: " << cntl.latency_us() << "us"
<< " Send: " << cntl.send_latency_us() << "us"
<< " Receive: " << cntl.receive_latency_us() << "us";
|
6.4 安全通信#
1
2
3
4
5
6
7
8
9
10
| // 启用 SSL/TLS
brpc::ChannelOptions options;
options.mutable_ssl_options()->client_cert = "client.crt";
options.mutable_ssl_options()->client_key = "client.key";
options.mutable_ssl_options()->ca_cert = "ca.crt";
// 服务器端 SSL
brpc::ServerOptions server_options;
server_options.mutable_ssl_options()->cert = "server.crt";
server_options.mutable_ssl_options()->key = "server.key";
|
七、实战案例#
7.1 实现限流中间件#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| class RateLimiter : public brpc::Interceptor {
private:
std::atomic<int> qps_count{0};
int max_qps;
public:
RateLimiter(int max_qps) : max_qps(max_qps) {}
void BeforeRpc(brpc::Controller* cntl) override {
if (qps_count.fetch_add(1) > max_qps) {
cntl->SetFailed(brpc::ELIMIT, "Rate limit exceeded");
cntl->SendResponse();
}
}
void AfterRpc(brpc::Controller* cntl) override {
qps_count.fetch_sub(1);
}
};
// 使用
server.AddInterceptor(new RateLimiter(10000));
|
7.2 实现熔断器#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| class CircuitBreaker {
private:
std::atomic<int> failure_count{0};
std::atomic<bool> is_open{false};
int threshold = 10;
public:
bool ShouldPass() {
if (is_open.load()) {
return false; // 熔断状态,拒绝请求
}
return true;
}
void OnSuccess() {
failure_count.store(0);
is_open.store(false);
}
void OnFailure() {
if (failure_count.fetch_add(1) >= threshold) {
is_open.store(true); // 触发熔断
// 启动定时器,一段时间后尝试恢复
}
}
};
|
7.3 实现请求缓存#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
| class CachedService : public example::EchoService {
private:
std::unordered_map<std::string, std::string> cache;
std::mutex cache_mutex;
public:
void Echo(google::protobuf::RpcController* cntl_base,
const example::EchoRequest* request,
example::EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
// 查询缓存
{
std::lock_guard<std::mutex> lock(cache_mutex);
auto it = cache.find(request->message());
if (it != cache.end()) {
response->set_message(it->second);
return;
}
}
// 处理请求
std::string result = ProcessRequest(request->message());
response->set_message(result);
// 更新缓存
{
std::lock_guard<std::mutex> lock(cache_mutex);
cache[request->message()] = result;
}
}
};
|
八、故障排查#
8.1 常见问题#
连接问题#
1
2
3
4
5
6
7
| // 检查连接状态
if (channel.CheckHealth() != 0) {
LOG(ERROR) << "Channel is unhealthy";
}
// 获取连接统计
LOG(INFO) << "Connection count: " << channel.ConnectionCount();
|
性能问题#
1
2
3
4
5
6
7
8
9
| // 使用内置性能分析工具
// 1. CPU profiling
// 访问 http://server:port/hotspots
// 2. 内存分析
// 访问 http://server:port/heap
// 3. RPC 追踪
// 访问 http://server:port/rpcz
|
8.2 调试技巧#
1
2
3
4
5
6
7
8
9
10
11
12
13
| // 开启详细日志
FLAGS_v = 4; // 设置日志级别
// 打印请求响应
class DebugInterceptor : public brpc::Interceptor {
void BeforeRpc(brpc::Controller* cntl) override {
LOG(INFO) << "Request: " << cntl->request_attachment();
}
void AfterRpc(brpc::Controller* cntl) override {
LOG(INFO) << "Response: " << cntl->response_attachment();
}
};
|
九、与其他框架对比#
9.1 brpc vs gRPC#
| 特性 | brpc | gRPC |
|---|
| 性能 | 极高(QPS 高 10000+) | 高 |
| 协议支持 | 丰富(HTTP/H2/Redis/MC等) | 主要是 HTTP/2 |
| 负载均衡 | 内置多种算法 | 需要额外组件 |
| 服务发现 | 内置支持 | 需要额外组件 |
| 调试工具 | 丰富的内置工具 | 相对较少 |
| 学习曲线 | 适中 | 较陡 |
| 社区 | Apache 社区 | Google 主导 |
9.2 选择建议#
选择 brpc:
- 需要极致性能(高并发、低延迟)
- 需要支持多种协议
- 在百度云或类似环境
- 需要丰富的调试工具
选择 gRPC:
- 跨语言支持更重要
- 与 Google 生态集成
- 团队已有 gRPC 经验
十、学习资源#
10.1 官方资源#
10.2 示例代码#
example/echo_c++/ - Echo 服务示例example/multi_threaded_echo_c++/ - 多线程示例example/streaming_echo_c++/ - 流式 RPC 示例example/parallel_echo_c++/ - 并行请求示例
10.3 进阶学习路径#
基础阶段(1-2周)
- 理解 RPC 基本概念
- 完成 Echo 示例
- 掌握基本的 Server/Channel 使用
进阶阶段(2-4周)
- 学习异步编程模型
- 掌握流式 RPC
- 理解负载均衡和服务发现
高级阶段(1-2月)
- 源码阅读(bthread 实现)
- 性能调优实践
- 自定义协议开发
专家阶段
十一、总结#
brpc 作为百度开源的工业级 RPC 框架,在性能、易用性和功能完整性方面都达到了业界领先水平。其优势不仅体现在极致的性能优化上,更在于其完善的生态和丰富的功能特性。
对于需要构建高性能分布式系统的团队,brpc 是一个值得认真考虑的选择。通过本指南的学习,相信你已经对 brpc 有了全面的认识,能够在实际项目中灵活运用。
记住,掌握一个框架的关键在于:
- 理解原理:知其然更要知其所以然
- 动手实践:从简单示例开始,逐步深入
- 阅读源码:深入理解实现细节
- 解决问题:在实践中积累经验
祝你在 brpc 的学习之路上越走越远!