作者 herman
导语
本文源自herman的系列文章之一《鹅厂开源框架TARS之根底组件》。相干代码已按TARS开源社区最新版本更新。
TARS开源框架库外面用C++实现了比拟多的专用组件,这些组件个别对立放在 util
文件夹,在应用层也能够自在应用,工欲善其事必先利其器,所以有必要把这些工具组件做理解,更好的应用,提高效率。接下来,本文将对如下TarsCpp组件进行剖析:
线程操作
- 线程平安队列: TC_ThreadQueue
- 一般线程锁: TC_ThreadLock
- 线程基类: TC_Thread
智能指针
- 智能指针类: TC_AutoPtr
DB操作
- MySQL操作类: TC_Mysql
网络操作
- 网络组件
服务配置
- 命令解析类: TC_Option
- 配置文件类: TC_Config
仿函数
- 通用仿函数类: TC_Functor
Hash
- hash算法
异样解决
- 异样类: TC_Exception
线程平安队列: TC_ThreadQueue
先看下框架对TC_ThreadQueue
类的应用如下:
typedef TC_ThreadQueue<tagRecvData*, deque<tagRecvData*> > recv_queue; // 接管队列typedef TC_ThreadQueue<tagSendData*, deque<tagSendData*> > send_queue; // 发送队列
TC_ThreadQueue
的实现比较简单,在TARS的网络层实现中能够发现这个类比拟重要,因为从框架中收到的网络包都会退出到这个缓存队列外面,而后多业务线程 ServantHandle
会调用 waitForRecvQueue
从该队列外面取网络数据包,而后调用 dispatch
调用协定音讯对应的处理函数,先看下框架对 TC_ThreadQueue
的实现:
/** * @brief 线程平安队列 */template<typename T, typename D = deque<T> >class TC_ThreadQueue{public: TC_ThreadQueue():_size(0){};public: typedef D queue_type; /** * @brief 从头部获取数据, 没有数据抛异样 * * @param t * @return bool: true, 获取了数据, false, 无数据 */ T front(); /** * @brief 从头部获取数据, 没有数据则期待. * * @param t * @param millsecond(wait = true时才失效) 阻塞等待时间(ms) * 0 示意不阻塞 * -1 永恒期待 * @param wait, 是否wait * @return bool: true, 获取了数据, false, 无数据 */ bool pop_front(T& t, size_t millsecond = 0, bool wait = true); ... ...}
TC_ThreadQueue
应用了C++11规范库中的<mutex>
和<condition_variable>
用于实现线程锁和 wait,如下,看下队列的成员函数:push_front
在队列后面退出数据,
template<typename T, typename D> void TC_ThreadQueue<T, D>::push_front(const T& t, bool notify){ if(notify) { std::unique_lock<std::mutex> lock(_mutex); _cond.notify_one(); _queue.push_front(t); ++_size; } else { std::lock_guard<std::mutex> lock (_mutex); _queue.push_front(t); ++_size; }}
如上图调用push_front
函数的时候调用 std::unique_lock<std::mutex> lock(_mutex)
加锁 ,防止网络层接收数据和业务层取同一队列的数据抵触,_cond.notify_one()
告诉期待在该锁上某一个线程醒过来,调用该函数之前必须加锁,因为有数据过去了,例如网络层有线程须要取包并进行散发解决。
再看一个成员函数pop_front
,从头部获取数据,没有数据则期待。millisecond
阻塞等待时间(ms)
0
示意不阻塞-1
永恒期待
template<typename T, typename D> bool TC_ThreadQueue<T, D>::pop_front(T& t, size_t millsecond, bool wait){ if(wait) { std::unique_lock<std::mutex> lock(_mutex); if (_queue.empty()) { if (millsecond == 0) { return false; } if (millsecond == (size_t) -1) { _cond.wait(lock); } else { //超时了 if (_cond.wait_for(lock, std::chrono::milliseconds(millsecond)) == std::cv_status::timeout) { return false; } } } if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; } else { std::lock_guard<std::mutex> lock (_mutex); if (_queue.empty()) { return false; } t = _queue.front(); _queue.pop_front(); assert(_size > 0); --_size; return true; }}
BindAdapter::waitForRecvQueue
的函数就是调用了pop_front
函数,用于期待接管队列,函数原型如下:
bool TC_EpollServer::BindAdapter::waitForRecvQueue(uint32_t handleIndex, shared_ptr<RecvContext> &data){ bool bRet = getRecvQueue(handleIndex).pop_front(data); if (!bRet) { return bRet; } --_iRecvBufferSize; return bRet;}
这里BindAdapter::waitForRecvQueue
用于业务线程在期待服务器监听的适配器收到网络包后进行业务包的解决,这里传入的handleIndex
示意接管队列索引,获取对应的_rbuffer
。
一般线程锁: TC_ThreadLock
TC_ThreadLock
类的定义如下
typedef TC_Monitor<TC_ThreadMutex, TC_ThreadCond> TC_ThreadLock;
TC_Monitor
线程锁监控模板类。通常线程锁,都通过该类来应用,而不是间接用TC_ThreadMutex
、TC_ThreadRecMutex
。
类的定义template <class T, class P> class TC_Monitor
须要传入两个模板参数,TC_Monitor
包含以下成员变量:
mutable int _nnotify; // 上锁的次数mutable P _cond; // 条件变量T _mutex; // 互斥锁/** * @brief 定义锁管制对象 */typedef TC_LockT<TC_Monitor<T, P> > Lock;typedef TC_TryLockT<TC_Monitor<T, P> > TryLock;
第一个参数 TC_ThreadMutex
代表线程锁:同一个线程不能够反复加锁 ,蕴含成员变量
mutable std::mutex _mutex
延长浏览,这里tc_thread_mutex.h
还包含另外一个循环锁类TC_ThreadRecMutex
,即一个线程能够加屡次锁,定义如下:
// 定义于tc_monitor.h中typedef TC_Monitor<TC_ThreadRecMutex, TC_ThreadCond> TC_ThreadRecLock;
第二个参数 TC_ThreadCond
代表线程信号条件类:所有锁能够在下面期待信号产生,蕴含线程条件成员变量:
mutable std::condition_variable_any _cond
结合实际的应用场景,TC_Monitor::timedWait()
会调用 TC_ThreadCond
对象的 timedWait
函数,下一步调用 chrono
库的 milliseconds
;TC_ThreadCond::signal()
实现发送信号,期待在该条件上的一个线程会醒。
TC_LockT
类定义: template <typename T> class TC_LockT
锁模板类,与其余具体锁配合应用,结构时候加锁,析够的时候解锁。
TC_LockT
构造函数,传入互斥量初始化成员变量 _mutex
,TC_LockT
构造函数实现:
TC_LockT(const T& mutex) : _mutex(mutex) { _mutex.lock(); _acquired = true;}
到这里就能够看出 TC_Monitor
定义的 typedef TC_LockT<TC_Monitor<T, P> > Lock
,这里 Lock
类型的模板参数用的是 TC_Monitor
类。
理论应用场景如下:
Lock lock(*this);
TC_LockT
的构造函数,传入参数 this
为 TC_Monitor
的子类对象,TC_LockT
的结构函数调用_mutex.lock()
;理论就是调用了 TC_Monitor
对象的 lock
函数,TC_Monitor
的 lock
函数实现:
void lock() const{ _mutex.lock(); _nnotify = 0;}
这里 _mutex
为 TC_ThreadMutex
对象,进一步调用了 TC_ThreadRecMutex::lock()
成员函数,实现如下:
void TC_ThreadMutex::lock() const{ _mutex.lock();}
而后下面定义的lock栈变量退出函数的时候调用 TC_LockT
的析构函数:实现如下:
virtual ~TC_LockT(){ if (_acquired) { _mutex.unlock(); //这里会调用TC_Monitor的unlock函数 }}
TC_Monitor
的 unlock
函数实现:
void unlock() const{ notifyImpl(_nnotify); _mutex.unlock(); //这里会调用C++规范库<mutex>中的unlock}
这里调用 notifyImpl
函数是因为 TC_Monitor
类不只能够实现简略的互斥锁性能,还能够实现条件变量Condition性能,其中 notifyImpl
的实现为
void notifyImpl(int nnotify) const{ if(nnotify != 0) { if(nnotify == -1) { _cond.broadcast(); return; } else { while(nnotify > 0) { _cond.signal(); --nnotify; } } }}
线程基类: TC_Thread
还是老样子,先看下我的项目理论对线程基类的应用。理论我的项目应用中,咱们对 TC_Thread
又封装了一下,实现了一个BasicThread
类,上面看下 BasicThread
的定义:
class BasicThread : public tars::TC_Thread, public tars::TC_ThreadLock{ ... void terminate() { _bTerm = true; { Lock lock(*this); notifyAll(); } getThreadControl().join(); }}
BasicThread
类,继承了 TC_Thread
和 TC_ThreadLock
,其中 TC_ThreadLock
第二点曾经阐明过了,所以这里重点看下 TC_Thread
类的应用,TC_Thread
的定义
class TC_Thread : public TC_Runable{ ... /** * 应用了C++11规范线程库std::thread, 构造函数传参数threadEntry线程函数, * 返回 TC_ThreadControl(_th),其中_th为std::thread对象 */ TC_ThreadControl start(); static void threadEntry(TC_Thread *pThread); //动态函数, 线程入口 virtual void run() = 0; ...}
下一步看下线程管制类 TC_ThreadControl
的定义:
class TC_ThreadControl {...explicit TC_ThreadControl(std::thread *th); // 结构,传入std::thread对象void join(); // 调用std::thread的join()阻塞以后的线程,直到另外一个线程运行完结static void sleep(); // 调用std::this_thread::sleep函数线程将暂停执行...}
下一步看下 TC_Runable
的定义:
class TC_Runable{public: virtual ~TC_Runable(){}; virtual void run() = 0; //定义了run纯虚函数};
最初看下理论我的项目中对线程类的应用
class AntiSdkSyncThread : public BasicThread //这里等于多继承了TC_Thread和TC_ThreadLock两个类{ void run() //实现基类的纯虚函数 { Lock lock(*this); timedWait(10 * 1000); (距离执行工夫,实现了线程的定时执行性能) if(NULL != g_busi_interf) { Int32 ret = g_busi_interf->proc_(); //须要定期执行的函数 } }}
定义好了 AntiSdkSyncThread g_antiSdkSyncThread;
类,那么须要启动线程的时候执行g_antiSdkSyncThread.start();
就会天然创立线程,并且 threadEntry
线程函数会调用 pThread->run()
多态函数,过程退出的时候调用 g_antiSdkSyncThread.terminate();
。
智能指针类: TC_AutoPtr
这里的智能指针能够放在容器中,且线程平安的智能指针,CPP11规范库的auto_ptr
是不能放在容器中的,貌似曾经被淘汰了,目前少数应用CPP11规范库的shared_ptr
,不过须要编译器反对CPP11。
TC_HandleBase
智能指针基类的定义如下,所有须要智能指针的类都须要从该对象继承,其中应用了C++11规范库中的<atomic>
进行原子计数。
class UTIL_DLL_API TC_HandleBase{public: /** * @brief 复制 * * @return TC_HandleBase& */ TC_HandleBase& operator=(const TC_HandleBase&) { return *this; } /** * @brief 减少计数 */ void incRef() { ++_atomic; } /** * @brief 缩小计数 */ void decRef() { if((--_atomic) == 0 && !_bNoDelete) { _bNoDelete = true; delete this; } } /** * @brief 获取计数. * * @return int 计数值 */ int getRef() const { return _atomic; } /** * @brief 设置不主动开释. * * @param b 是否主动删除,true or false */ void setNoDelete(bool b) { _bNoDelete = b; }protected: /** * @brief 构造函数 */ TC_HandleBase() : _atomic(0), _bNoDelete(false) { } /** * @brief 拷贝结构 */ TC_HandleBase(const TC_HandleBase&) : _atomic(0), _bNoDelete(false) { } /** * @brief 析构 */ virtual ~TC_HandleBase() { }protected: std::atomic<int> _atomic; // 援用计数 bool _bNoDelete; // 是否主动删除};
下一步看 TC_AutoPtr
智能指针模板类,能够放在容器中,且线程平安的智能指针,该智能指针通过援用计数实现,其构造函数和析构函数定义如下:
template<typename T> class TC_AutoPtr{ TC_AutoPtr(T* p = 0) { _ptr = p; if(_ptr) { _ptr->incRef(); //构造函数 援用计算加1 } } ... ~TC_AutoPtr() { if(_ptr) { _ptr->decRef(); //析构函数 援用计算减1 } }}
例子:实战我的项目应用
struct ConnStruct : public TC_HandleBase{...}typedef TC_AutoPtr<ConnStruct> ConnStructPtr;
TC_AutoPtr
拷贝结构调用 _ptr->incRef();
这里 ptr
为 ConnStruct
,ConnStruct
继承于TC_HandleBase
,等于调用了TC_HandleBaseT<int>::incRef() {++_atomic;}
援用计数原子操作加1、析构援用计数原子操作减1,当援用计数缩小到0时依据设置的开关是否要进行删除来决定是否触发delete。
例子:这是TARS应用异步rpc回调的典型例子,这里回调类应用了智能指针
// 定义回调函数智能指针,其中SessionCallback父类继承于TC_HandleBasetypedef TC_AutoPtr<SessionCallback> SessionCallbackPtr; //创立回调类SessionCallbackPtr,并传入初始化参数uin gameid等;SessionCallbackPtr cb = new SessionCallback(iUin, iGameId, iSeqID, iCmd,sSessionID, theServant, current, cs, this);//异步调用sessionserver近程接口getSessionPrx()->async_getSession(cb, iUin, iGameId);
接口返回实现,回调SessionCallback::callback_getSession(tars::Int32 ret, const MGComm::SessionValue& retValue)
函数,接管sessionserver
接口的返回的SessionValue
构造。
因为 SessionCallbackPtr
应用了智能指针,所以业务不须要去手动开释后面 new
进去的 SessionCallbackPtr
,还是比拟不便的。
MySQL操作类: TC_Mysql
TC_Mysql封装好的mysql操作类,非线程平安,对于 insert/update 能够有更好的函数封装,避免SQL注入
应用形式:
TC_Mysql mysql;//初始化mysql,init时不链接,申请时主动建设链接;//数据库能够为空;//端口默认为3306mysql.init("192.168.1.2", "pc", "pc@sn", "db_tars_demo");
通常用:void init(const TC_DBConf& tcDBConf);
间接初始化数据库。例如:stDirectMysql.init(_stZoneDirectDBConf);
看下TC_DBConf
的定义
struct TC_DBConf{ string _host; string _user; string _password; string _database; string _charset; int _port; int _flag; //客户端标识 TC_DBConf() : _port(0) , _flag(0) {} /** * @brief 读取数据库配置. * * @param mpParam 寄存数据库配置的map * dbhost: 主机地址 * dbuser:用户名 * dbpass:明码 * dbname:数据库名称 * dbport:端口 */ void loadFromMap(const map<string, string> &mpParam) { map<string, string> mpTmp = mpParam; _host = mpTmp["dbhost"]; _user = mpTmp["dbuser"]; _password = mpTmp["dbpass"]; _database = mpTmp["dbname"]; _charset = mpTmp["charset"]; _port = atoi(mpTmp["dbport"].c_str()); _flag = 0; if(mpTmp["dbport"] == "") { _port = 3306; } }};
//进一步看下获取数据的应用TC_Mysql::MysqlData data;data = mysql.queryRecord("select * from t_app_users");for(size_t i = 0; i < data.size(); i++){ //如果不存在ID字段,则抛出异样 cout << data[i]["ID"] << endl;}
查问进去的mysql数据用MysqlData
封装
class MysqlData{ ... vector<map<string, string> >& data(); ...}
//插入数据,指定数据的类型:数值 或 字符串,对于字符串会主动本义map<string, pair<TC_Mysql::FT, string> > m;m["ID"] = make_pair(TC_Mysql::DB_INT, "2334");m["USERID"] = make_pair(TC_Mysql::DB_STR, "abcttt");m["APP"] = make_pair(TC_Mysql::DB_STR, "abcapbbp");m["LASTTIME"] = make_pair(TC_Mysql::DB_INT, "now()");mysql.replaceRecord("t_user_logs", m);
网络组件
整个TARS外围就提供一个很欠缺的网络框架,包含RPC性能,这里只介绍几个罕用的网络组件。
TC_Socket : 封装了socket的根本办法
提供socket的操作类;反对tcp/udp socket;反对本地区套接字。
再下一层TARS封装了TC_TCPClient
和TC_UDPClient
两个类用于实际操作tcp和udp利用。
应用形式:
例如:tcp客户端
TC_TCPClient stRouterClient;stRouterClient.init(sIP, iPort, iTimeOut); // 这里传入ip和端口而后调用sendRecv进行音讯的收发Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
留神多线程应用的时候,不能多线程同时send/recv,小心串包。
TC_Epoller
提供网络epoll的操作类,默认是ET模式,当状态发生变化的时候才取得告诉,提供add、mod、del、wait等根底操作。
TC_ClientSocket : 客户端socket相干操作基类
提供要害成员函数init(const string &sIp, int iPort, int iTimeout)
,传入 IP 端口 和 超时工夫
TC_TCPClient
继承于 TC_ClientSocket
提供成员函数:
sendRecv
(发送到服务器, 从服务器返回不超过iRecvLen的字节)sendRecvBySep
( 发送倒服务器, 并期待服务器直到结尾字符, 蕴含结尾字符)
例子:
stRouterClient.init(sIP, iPort, iTimeOut);size_t iRecvLen = sizeof(recvBuf)-1;Int32 ret = stRouterClient.sendRecv(request.c_str(), request.length(), recvBuf, iRecvLen);
同理还有TC_UDPClient
实现UDP客户端。
命令解析类: TC_Option
- 命令解析类;
- 通常用于解析命令行参数;
- 只反对双—的参数模式
- 剖析
main
的输出参数,反对以下模式的参数:
./main.exe --name=value --param1 param2 param3
TC_Option op;//解析命令行op.decode(argc, argv);//获取成对的参数,即获取 - - 示意的所有参数对map<string, string> mp = op.getMulti();//示意非 – 的参数:即 param2, param3vector<string> d = op.getSingle();
如果value,param有空格或者 --
,用引号括起来就能够了。
配置文件类: TC_Config
- 配置文件解析类(兼容wbl模式);
- 反对从string中解析配置文件;
- 反对生成配置文件;
- 解析出错抛出异样;
- 采纳[]获取配置,如果无配置则抛出异样;
- 采纳get获取配置,不存在则返回空;
- 读取配置文件是线程平安的,insert域等函数非线程平安
例子:
TC_Config config;config.parseFile(ServerConfig::BasePath + ServerConfig::ServerName + ".conf");stTmpGameServerConfig.iGameId = TC_Common::strto<UInt32>(config["/Main/<GameId>"]);
配置文件样例
<Main> GameId = 3001 ZoneId = 102 AsyncThreadCheckInterval = 1000 ...</Main>
应用get
办法例子:如果读不到该配置,则返回默认值 sDefault
,即上面例子中的 20000000
stTmpGameServerConfig.iMaxRegNum = TC_Common::strto<Int32>(config.get("/Main/<MaxRegNum>", "20000000"));
通用仿函数类: TC_Functor
TC_Functor
参考loki
库的设计
- 仿函数对象调用形式, 即对上述的几种形式都能够在右侧增加一对圆括号,并在括号外部放一组适合的参数来调用,例如
a(p1,p2);
- 把整个调用(包含参数)封装一个函数对象, 调用对象建设时就传入了参数,调用的时候不必传入参数,例如
A a(p1, p2); a();
简略又好用的封装,具体见上面应用例子天然明确:
C函数调用
void TestFunction3(const string &s, int i){ cout << "TestFunction3('" << s << "', '" << i << "')" << endl; }//采纳函数指针结构对象TC_Functor<void, TL::TLMaker<const string&, int>::Result > cmd3(TestFunction3);string s3("s3");cmd3(s3, 10);
C函数调用用wrapper封装:
//调用封装,结构的时候传入参数TC_Functor<void,TL::TLMaker<const string&, int>::Result>::wrapper_type fwrapper3(cmd3, s3, 10);fwrapper3(); //参数曾经在结构的时候传入,调用的时候不必传参数了
阐明:
void
: 函数的返回值TL::TLMaker<const string&, int>::Result
: 代表参数类型
对于调用的封装,留神对于传援用类型,具体的调用时候要保障援用的对象存在。
C++指向类成员函数的调用
struct TestMember{ void mem3(const string &s, int i) { cout << "TestMember::mem3(" << s << "," << i << ") called" << endl; }}TC_Functor<void, TL::TLMaker<const string&, int>::Result > cmd3(&tm, &TestMember::mem3);cmd3("a", 33);
指向类成员函数的调用用wrapper封装:
TC_Functor<void, TL::TLMaker<const string&, int>::Result >::wrapper_type fwrapper3(cmd3, "a", 10);fwrapper3();
理论例子:注册协定解析器
服务初始化initialize的时候,个别会调用
addServantProtocol(sRouterObj, AppProtocol::parseStream<0, uint16_t, false>,iHeaderLen);
这里设置BindAdapter
的协定解析函数 protocol_functor _pf
为 parseStream
函数,如下:
/** * @param T * @param offset * @param netorder * @param in * @param out * @return int */template<size_t offset, typename T, bool netorder>static TC_NetWorkBuffer::PACKET_TYPE parseStream(TC_NetWorkBuffer& in,vector<char>& out){ size_t len = offset + sizeof(T); if (in.getBufferLength() < len) { return TC_NetWorkBuffer::PACKET_LESS; } string header; in.getHeader(len, header); assert(header.size() == len); T iHeaderLen = 0; ::memcpy(&iHeaderLen, header.c_str() + offset, sizeof(T)); if (netorder) { iHeaderLen = net2host<T>(iHeaderLen); } //长度爱护一下 if (iHeaderLen < (T)(len) || (uint32_t)iHeaderLen > TARS_NET_MAX_PACKAGE_SIZE) { return TC_NetWorkBuffer::PACKET_ERR; } if (in.getBufferLength() < (uint32_t)iHeaderLen) { return TC_NetWorkBuffer::PACKET_LESS; } in.getHeader(iHeaderLen, out); assert(out.size() == iHeaderLen); in.moveHeader(iHeaderLen); return TC_NetWorkBuffer::PACKET_FULL;}
注册好解析函数之后,网络层收包调用parseProtocol
函数
int TC_EpollServer::Connection::parseProtocol(TC_NetWorkBuffer &rbuf){ ... TC_NetWorkBuffer::PACKET_TYPE b = _pBindAdapter->getProtocol()(rbuf, ro); //这里回调后面设置好的协定解析函数,从而实现协定解析 ...}
hash算法
util/tc_hash_fun.h
中蕴含了对hash算法的实现,应用 hash_new
,能够对输出的字节流进行hash失去相当平均的hash值,应用形式如下
#include "util/tc_hash_fun.h"#include <iterator>#include <iostream>#include <sys/time.h>using namespace tars;using namespace std;int main(int argc, char* *argv[]){ unsigned int i = tars::hash_new<string>()("abcd"); cout << i << endl; return 0;}
异样类: TC_Exception
class TC_Exception : public exception{ /** * @brief 构造函数,提供了一个能够传入errno的构造函数, * 异样抛出时间接获取的错误信息 * * @param buffer 异样的告警信息 * @param err 错误码, 可用strerror获取错误信息 */ TC_Exception(const string &buffer, int err); }
总结
本文介绍剖析了TARS框架中用C++实现的专用根底组件,加深对这些工具类根底组件的了解,缩小在应用这些组件过程中产生的问题,进步开发效率。
TARS能够在思考到易用性和高性能的同时疾速构建零碎并主动生成代码,帮忙开发人员和企业以微服务的形式疾速构建本人稳固牢靠的分布式应用,从而令开发人员只关注业务逻辑,进步经营效率。多语言、麻利研发、高可用和高效经营的个性使 TARS 成为企业级产品。
TARS微服务助您数字化转型,欢送拜访:
TARS官网:https://TarsCloud.org
TARS源码:https://github.com/TarsCloud
获取《TARS官网培训电子书》:https://wj.qq.com/s2/6570357/...
或扫码获取: