作者:jiangtzhang,腾讯WXG后台开发工程师
| 导语 性能优化,长路漫漫啊。
在过去的一段时间做了些性能优化的事情,在这里记录一下遇到的一些性能优化case,也请各位大佬多多指教。
1. 使用Class代替ProtoBuf协议
在大量调用的API代码中尽量的不要用ProtoBuf协议,最好使用C++的Class来代替。因为ProtoBuf采用的是Arena内存分配器策略,有些场景会比C++的Class内存管理复杂,当有大量内存分配和释放的时候会比Class的性能差很多。而且Protobuf会不断的分配和回收小内存对象,持续的分配和删除小内存对象导致产生内存碎片,降低程序的内存使用率,尤其是当协议中包含string类型的时候,性能差距可能有几倍。对于包含了很多小对象的protobuf message,析构过程会导致堆上分配了许多对象,从而会变得复杂,导致析构函数执行速度变慢。
下面给出一个实际开发中使用Class代替Protobuf的例子:
ProtoBuf协议:
message Param { optional string name = 1; optional string value = 2; } message ParamHit { enum Type { Unknown = 0; WhiteList = 1; LaunchLayer = 2; BaseAB = 3; DefaultParam = 4; } optional Param param = 1; optional uint64 group_id = 2; optional uint64 expt_id = 3; optional uint64 launch_layer_id = 4; optional string hash_key_used = 5; optional string hash_key_val_used = 6; optional Type type = 7; optional bool is_hit_mbox = 8; }
改写的Class
class ParamHitInfo { public: class Param { public: Param() = default; ~Param() = default; const std::string & name() const { return name_; } void set_name(const std::string &name) { name_ = name; } void clear_name() { name_.clear(); } const std::string & value() const { return value_; } void set_value(const std::string &value) { value_ = value; } void clear_value() { value_.clear(); } void Clear() { clear_name(); clear_value(); } private: std::string name_, value_; }; ParamHitInfo() { expt_id_ = group_id_ = launch_layer_id_ = 0u; is_hit_mbox_ = false; type_ = ParamHit::Unknown; } ~ParamHitInfo() = default; void Clear() { clear_group_id(); clear_expt_id(); clear_launch_layer_id(); clear_is_hit_mbox(); clear_hash_key_used(); clear_hash_key_val_used(); clear_type(); param_.Clear(); } const ParamHit ToProtobuf() const { ParamHit ans; ans.set_expt_id(expt_id_); ans.set_group_id(group_id_); ans.set_launch_layer_id(launch_layer_id_); ans.set_is_hit_mbox(is_hit_mbox_); ans.set_hash_key_used(hash_key_used_); ans.set_hash_key_val_used(hash_key_val_used_); ans.set_type(type_); ans.mutable_param()->set_name(param_.name()); ans.mutable_param()->set_value(param_.value()); return ans; } uint64_t group_id() const { return group_id_; } void set_group_id(const uint64_t group_id) { group_id_ = group_id; } void clear_group_id() { group_id_ = 0u; } uint64_t expt_id() const { return expt_id_; } void set_expt_id(const uint64_t expt_id) { expt_id_ = expt_id; } void clear_expt_id() { expt_id_ = 0u; } uint64_t launch_layer_id() const { return launch_layer_id_; } void set_launch_layer_id(const uint64_t launch_layer_id) { launch_layer_id_ = launch_layer_id; } void clear_launch_layer_id() { launch_layer_id_ = 0u; } bool is_hit_mbox() const { return is_hit_mbox_; } void set_is_hit_mbox(const bool is_hit_mbox) { is_hit_mbox_ = is_hit_mbox; } void clear_is_hit_mbox() { is_hit_mbox_ = false; } const std::string & hash_key_used() const { return hash_key_used_; } void set_hash_key_used(const std::string &hash_key_used) { hash_key_used_ = hash_key_used; } void clear_hash_key_used() { hash_key_used_.clear(); } const std::string & hash_key_val_used() const { return hash_key_val_used_; } void set_hash_key_val_used(const std::string &hash_key_val_used) { hash_key_val_used_ = hash_key_val_used; } void clear_hash_key_val_used() { hash_key_val_used_.clear(); } ParamHit_Type type() const { return type_; } void set_type(const ParamHit_Type type) { type_ = type; } void clear_type() { type_ = ParamHit::Unknown; } const Param & param() const { return param_; } Param * mutable_param() { return ¶m_; } std::string ShortDebugString() const { std::string ans = "type: " + std::to_string(type_); ans.append(", group_id: ").append(std::to_string(group_id_)); ans.append(", expt_id: ").append(std::to_string(expt_id_)); ans.append(", launch_layer_id: ").append(std::to_string(launch_layer_id_)); ans.append(", hash_key_used: ").append(hash_key_used_); ans.append(", hash_key_val_used: ").append(hash_key_val_used_); ans.append(", param_name: ").append(param_.name()); ans.append(", param_val: ").append(param_.value()); ans.append(", is_hit_mbox: ").append(std::to_string(is_hit_mbox_)); return ans; } int ByteSize() { int ans = 0; ans += sizeof(uint64_t) * 3 + sizeof(bool) + sizeof(ParamHit_Type); ans += hash_key_used_.size() + hash_key_val_used_.size() + param_.name().size() + param_.value().size(); return ans; } private: ParamHit_Type type_; uint64_t group_id_, expt_id_, launch_layer_id_; std::string hash_key_used_, hash_key_val_used_; bool is_hit_mbox_; Param param_; };
性能测试代码:
TEST(ParamHitDestructorPerf, test) { vector<ParamHit> hits; vector<ParamHitInfo> hit_infos; const int hit_cnts = 1000; vector<pair<string, string>> params; for (int i=0; i<hit_cnts; ++i) { string name = "name: " + to_string(i); string val; int n = 200; val.resize(n); for (int i=0; i<n; ++i) val[i] = (i%10 + 'a'); params.push_back(make_pair(name, val)); } int uin_start = 12345645; for (int i=0; i<hit_cnts; ++i) { ParamHit hit; hit.set_expt_id(i + uin_start); hit.set_group_id(i + 1 + uin_start); hit.set_type(ParamHit::BaseAB); hit.set_is_hit_mbox(false); hit.set_hash_key_used("uin_bytes"); hit.set_hash_key_val_used(BusinessUtil::UInt64ToLittleEndianBytes(i)); auto p = hit.mutable_param(); p->set_name(params[i].first); p->set_value(params[i].second); hits.emplace_back(std::move(hit)); } for (int i=0; i<hit_cnts; ++i) { ParamHitInfo hit; hit.set_expt_id(i + uin_start); hit.set_group_id(i + 1 + uin_start); hit.set_type(ParamHit::BaseAB); hit.set_is_hit_mbox(false); hit.set_hash_key_used("uin_bytes"); hit.set_hash_key_val_used(BusinessUtil::UInt64ToLittleEndianBytes(i)); auto p = hit.mutable_param(); p->set_name(params[i].first); p->set_value(params[i].second); hit_infos.emplace_back(std::move(hit)); } int kRuns = 1000; chrono::high_resolution_clock::time_point t1 = chrono::high_resolution_clock::now(); { for (int i=0; i<kRuns; ++i) { for (auto &&hit: hits) { auto tmp = hit; } } } chrono::high_resolution_clock::time_point t2 = chrono::high_resolution_clock::now(); auto time_span = chrono::duration_cast<chrono::milliseconds>(t2 - t1); std::cerr << "ParamHit_PB Destructor kRuns: " << kRuns << " hit_cnts: " << hit_cnts << " cost: " << time_span.count() << "ms\n"; t1 = chrono::high_resolution_clock::now(); { for (int i=0; i<kRuns; ++i) { for (auto &&hit: hit_infos) { auto tmp = hit; } } } t2 = chrono::high_resolution_clock::now(); time_span = chrono::duration_cast<chrono::milliseconds>(t2 - t1); std::cerr << "ParamHitInfo_Class Destructor kRuns: " << kRuns << " hit_cnts: " << hit_cnts << " cost: " << time_span.count() << "ms\n"; }
性能对比结果:
可以看到使用c++的Class相比于ProtoBuf可以提升3倍的性能 。
2. 使用Cache Friendly的数据结构
这里想先抛出一个问题:使用哈希表的查找一定比使用数组的查找快吗?
Q:理论上来说哈希表的查找复杂度是O(1),数组的查找复杂度是O(n),那么是不是可以得到一个结论就是说哈希表的查找速度一定比数组快呢?
A:其实是不一定的,由于数组具有较高的缓存局部性,可提高CPU缓存的命中率,所以在有些场景下数组的查找效率要远远高于哈希表。
这里给出一个常见操作耗时的数据(2020年):
下面也给出一个项目中的使用Cache Friendly优化的例子:
优化前的数据结构:
class HitContext { public: inline void update_hash_key(const std::string &key, const std::string &val) { hash_keys_[key] = val; } inline const std::string * search_hash_key(const std::string &key) const { auto it = hash_keys_.find(key); return it != hash_keys_.end() ? &(it->second) : nullptr; } private: Context context_; std::unordered_map<std::string, std::string> hash_keys_; };
优化后的数据结构:
class HitContext { public: inline void update_hash_key(const std::string &key, const std::string &val) { if (Misc::IsSnsHashKey(key)) { auto sns_id = Misc::FastAtoi(key.c_str()+Misc::SnsHashKeyPrefix().size()); sns_hash_keys_.emplace_back(sns_id, Misc::LittleEndianBytesToUInt32(val)); return; } hash_keys_[key] = val; } inline void update_hash_key(const std::string &key, const uint32_t val) { if (Misc::IsSnsHashKey(key)) { auto sns_id = Misc::FastAtoi(key.c_str()+Misc::SnsHashKeyPrefix().size()); sns_hash_keys_.emplace_back(sns_id, val); return; } hash_keys_[key] = Misc::UInt32ToLittleEndianBytes(val); } inline const std::string search_hash_key(const std::string &key, bool &find) const { if (Misc::IsSnsHashKey(key)) { auto sns_id = Misc::FastAtoi(key.c_str()+Misc::SnsHashKeyPrefix().size()); auto it = std::find_if(sns_hash_keys_.rbegin(), sns_hash_keys_.rend(), [sns_id](const std::pair<uint32_t, uint32_t> &v) { return v.first == sns_id; }); find = it != sns_hash_keys_.rend(); return find ? Misc::UInt32ToLittleEndianBytes(it->second) : ""; } auto it = hash_keys_.find(key); find = it != hash_keys_.end(); return find ? it->second : ""; } private: Context context_; std::unordered_map<std::string, std::string> hash_keys_; std::vector<std::pair<uint32_t, uint32_t>> sns_hash_keys_; };
性能测试代码
TEST(HitContext, test) { const int keycnt = 264; std::vector<std::string> keys, vals; for (int j = 0; j < keycnt; ++j) { auto key = j+21324; auto val = j+94512454; keys.push_back("sns"+std::to_string(key)); vals.push_back(std::to_string(val)); } const int kRuns = 1000; std::unordered_map<uint32_t, uint64_t> hash_keys; chrono::high_resolution_clock::time_point t1 = chrono::high_resolution_clock::now(); for (int i = 0; i < kRuns; ++i) { HitContext1 ctx; for (int j = 0; j < keycnt; ++j) { ctx.update_hash_key(keys[j], vals[j]); } for (int j=0; j<keycnt; ++j) { auto val = ctx.search_hash_key(keys[j]); if (!val) assert(0); } } chrono::high_resolution_clock::time_point t2 = chrono::high_resolution_clock::now(); auto time_span = chrono::duration_cast<chrono::microseconds>(t2 - t1); std::cerr << "HashTable Hitcontext cost: " << time_span.count() << "us" << std::endl; hash_keys.clear(); t1 = chrono::high_resolution_clock::now(); for (int i = 0; i < kRuns; ++i) { HitContext2 ctx; for (int j = 0; j < keycnt; ++j) { ctx.update_hash_key(keys[j], vals[j]); } for (int j=0; j<keycnt; ++j) { bool find = false; auto val = ctx.search_hash_key(keys[j], find); if (!find) assert(0); } } t2 = chrono::high_resolution_clock::now(); time_span = chrono::duration_cast<chrono::microseconds>(t2 - t1); std::cerr << "Vector HitContext cost: " << time_span.count() << "us" << std::endl; }
性能对比结果:
3. 使用jemalloc/tcmalloc代替普通的malloc方式
由于代码中大量的使用了C++的STL,所以会出现以下几种缺点:
(1). 内存碎片:频繁分配和释放不同大小的对象,可能导致内存碎片,降低内存的使用效率。
(2). Cache不友好。而且STL的普通内存分配器分散了对象的内存地址,降低了数据的缓存命中率
(3). 并发差。STL的默认内存分配器可能使用全局锁,相当于给加了一把大锁,在多线程环境下性能表现很差。
目前在我们的代码中加jemalloc还是很方便的,就是在所编译的target中加下依赖就好了,比如:
使用jemalloc与不使用jemalloc前后性能对比(这里的测试场景是在loadbusiness的时候,具体涉及到了一些业务代码)
可以发现使用jemalloc可以提升20%多的性能,还是优化了很大的,很小的开发成本(只需要加一个编译依赖)带来不错的收益。
4. 使用无锁数据结构
在过去项目开发的时候使用过一种双buffer的无锁数据结构,之所以使用双buffer是因为API有大约26亿/s的调用量,这么高的调用量对性能的要求是很高的。数据结构的定义:
struct expt_api_new_shm { void *p_shm_data; // header volatile int *p_mem_switch; // 0:uninit. 1:mem 1 on server. 2:mem 2 on server uint32_t *p_crc_sum; // data expt_new_context* p_new_context; parameter2business* p_param2business; char* p_business_cache; HashTableWithCache hash_table; //多级哈希表 };
常用的几个函数:
int InitExptNewShmData(expt_api_new_shm *pstShmData, void *pData) { int ptr_offset = EXPT_NEW_SHM_HEADER_SIZE; pstShmData->p_shm_data = pData; pstShmData->p_mem_switch = MAKE_PTR(volatile int *, pData, 0); pstShmData->p_crc_sum = MAKE_PTR(uint32_t *, pData, 4); pstShmData->p_new_context = (expt_new_context *)((uint8_t *)pstShmData->p_shm_data + ptr_offset); pstShmData->p_param2business = (parameter2business *)((uint8_t *)pstShmData->p_shm_data + ptr_offset + EXPT_NEW_SHM_OFFSET_0); pstShmData->p_business_cache = (char *)((uint8_t *)pstShmData->p_shm_data + ptr_offset + EXPT_NEW_SHM_OFFSET_1); size_t node_size = sizeof(APICacheItem), row_cnt = sizeof(auModsInCache)/sizeof(size_t); size_t hash_tbl_size = CalHashTableWithCacheSize(node_size, row_cnt, auModsInCache); pstShmData->hash_table.pTable = (void *)((uint8_t *)pstShmData->p_shm_data + EXPT_NEW_SHM_SIZE - hash_tbl_size); int ret = HashTableWithCacheInit(&pstShmData->hash_table, hash_tbl_size, node_size, row_cnt, auModsInCache); return ret; } int ResetExptNewShmData(expt_api_new_shm *pstShmData) { int iOffset = 0; if (*pstShmData->p_mem_switch <= 1) { iOffset = 0; } else if (*pstShmData->p_mem_switch > 1) { iOffset = EXPT_NEW_SHM_DATA_SIZE; } void *ptrData = MAKE_PTR(void *, pstShmData->p_shm_data, EXPT_NEW_SHM_HEADER_SIZE + iOffset); memset(ptrData, 0, EXPT_NEW_SHM_DATA_SIZE); return 0; } int ResetExptNewShmHeader(expt_api_new_shm *pstShmData) { memset(pstShmData->p_shm_data, 0, EXPT_NEW_SHM_HEADER_SIZE); return 0; } void SwitchNewShmMemToWrite(expt_api_new_shm *pstShmData) { int iSwitchOffset = EXPT_NEW_SHM_DATA_SIZE * ((*pstShmData->p_mem_switch <= 1 ? 0 : 1)); int ptr_offset = EXPT_NEW_SHM_HEADER_SIZE + iSwitchOffset; pstShmData->p_new_context = (expt_new_context *)((uint8_t *)pstShmData->p_shm_data + ptr_offset); pstShmData->p_param2business = (parameter2business *)((uint8_t *)pstShmData->p_shm_data + ptr_offset + EXPT_NEW_SHM_OFFSET_0); pstShmData->p_business_cache = (char *)((uint8_t *)pstShmData->p_shm_data + ptr_offset + EXPT_NEW_SHM_OFFSET_1); } void SwitchNewShmMemToWriteDone(expt_api_new_shm *pstShmData) { if (*pstShmData->p_mem_switch <= 1) *pstShmData->p_mem_switch = 2; else *pstShmData->p_mem_switch = 1; } void SwitchNewShmMemToRead(expt_api_new_shm *pstShmData) { int iSwitchOffset = EXPT_NEW_SHM_DATA_SIZE * ((*pstShmData->p_mem_switch <= 1 ? 1 : 0)); int ptr_offset = EXPT_NEW_SHM_HEADER_SIZE + iSwitchOffset; pstShmData->p_new_context = (expt_new_context *)((uint8_t *)pstShmData->p_shm_data + ptr_offset); pstShmData->p_param2business = (parameter2business *)((uint8_t *)pstShmData->p_shm_data + ptr_offset + EXPT_NEW_SHM_OFFSET_0); pstShmData->p_business_cache = (char *)((uint8_t *)pstShmData->p_shm_data + ptr_offset + EXPT_NEW_SHM_OFFSET_1); }
双buffer的工作原理就是:设置两个buffer,一个用于读,另一个用于写。
(a)初始化这两个buffer为空,调用InitExptNewShmData函数。
(b)对于写操作,先准备数据,即调用SwitchNewShmMemToWrite函数,等数据准备完(即写完相应的数据),然后调用SwitchNewShmMemToWriteDone函数,完成指针的切换。
(c)对于读操作,线程从读buffer读取数据,调用SwitchNewShmMemToRead函数。
我们平台的场景主要是读,而且由于拉取实验配置采用的都是增量的拉取方式,所以配置的改变也不是很频繁,也就很少有写操作的出现。采用双buffer无锁数据结构的优势在于可以提高并发性能,由于读写操作在不同的buffer上同时进行,所以不需要额外加锁,减少了数据竞争和锁冲突的可能性。当然这种数据结构也有相应的缺点,就是会多用了一倍的内存,用空间换时间。
5.对于特定的场景采用特定的处理方式
这其实也很容易理解其实有很多场景是需要定制化优化的,所以不能从主体代码的层面去优化了,那换个思路,是不是可以从返回的数据格式进行优化呢?举个我们过去遇到的一个例子:我们平台有一个染色场景,就是需要对当天登录的所有微信用户计算命中情况,旧的数据格式其实返回了一堆本身染色场景不需要的字段,所以这里其实是可以优化的。
优化前的数据格式:
struct expt_param_item { int experiment_id; int expt_group_id; int layer_id; int domain_id; uint32_t seq; uint32_t start_time; uint32_t end_time; uint8_t expt_type; uint16_t expt_client_expand; int parameter_id; uint8_t value[MAX_PARAMETER_VLEN]; char param_name[MAX_PARAMETER_NLEN]; int value_len; uint8_t is_pkg = 0; uint8_t is_white_list = 0; uint8_t is_launch = 0; uint64_t bucket_src = 0; uint8_t is_control = 0; };
其实染色场景下不需要参数的信息,只保留实验ID、组ID以及分桶的信息就好了。优化后的数据格式:
struct DyeHitInfo { int expt_id, group_id; uint64_t bucket_src; DyeHitInfo(){}DyeHitInfo(int expt_id_, int group_id_, uint64_t bucket_src_) :expt_id(expt_id_), group_id(group_id_), bucket_src(bucket_src_){} bool operator <(const DyeHitInfo &hit) const { if (expt_id == hit.expt_id) { if (group_id == hit.group_id) return bucket_src < hit.bucket_src; return group_id < hit.group_id; }return expt_id < hit.expt_id;} bool operator==(const DyeHitInfo &hit) { return hit.expt_id == expt_id && hit.group_id == group_id && hit.bucket_src == bucket_src; } std::string ToString() const { char buf[1024]; sprintf(buf, "expt_id: %u, group_id: %u, bucket_src: %lu", expt_id, group_id, bucket_src); return std::string(buf); } };
优化前后性能对比:
所以其实针对某些特殊场景做一些定制化的开发成本也没有很高,但是带来的收益却是巨大的。
6.善用性能测试工具
这里列举一些常见的性能测试工具:linux提供的perf、GNU编译器提供的gprof、Valgrind、strace等等。
这里推荐几个觉得好用的工具:
perf(linux自带性能测试工具)
https://godbolt.org/(可以查看代码对应的汇编代码)
微信运维提供的性能测试工具
https://github.com/brendangregg/FlameGraph (生成火焰图的工具)
7.总结
其实还有一些性能优化的地方,比如使用合适的数据结构和算法、减少大对象的拷贝,减少无效的计算,IO与计算分离,分支预测等等,后续如果有时间的话可以在补充点内容。性能优化不是一锤子买卖,所以需要一直监控,一直优化。需要注意的一点是不要过度优化,在提升程序性能的时候不要丢掉代码的可维护性,而且还要评估下性能提升带来的收益是否与花费的时间成正比。总之,性能优化,长路漫漫。
在这里要感谢下团队小伙伴的帮助。
最后请各位大佬多多指教。
8.参考
(1). https://colin-scott.github.io/personal_website/research/interactive_latency.html
(2). https://zhuanlan.zhihu.com/p/484951383
(3). https://www.brendangregg.com/perf.html
(4). https://jemalloc.net/