邮箱(deepin-mail)次要应用Qt框架开发,是一个有大量并行任务且须要监控进度和后果的我的项目,工作的优先级调整和反对勾销回滚也是必不可少。Qt曾经为咱们提供了多种线程设计的办法,能够满足绝大部分应用场景,然而每一种计划单拎进去都不能很恰到好处的在我的项目中应用,本文次要对我的项目中的线程设计进行介绍,通过将多个Qt线程计划相结合,衍生出一种适宜邮箱这类型我的项目的利用办法。

浅谈Qt中的线程办法

QThread

基于QThread的计划,须要子类化QThread实现run()办法,并在适合的时候调用start(),在这之前须要做好数据交换,并在线程执行过程中通过跨线程的connect()办法将数据传出,这里须要留神不能将连贯参数设置为Qt::DirectConnection,因为线程的数据须要通过事件循环传递来保障平安,在跨线程的connect()办法中参数即便应用了援用Qt也仍然会在emit时帮你额定触发一次参数拷贝,影响性能。

如果不是开销十分大的工作像这样间接继承其实不受举荐,当初更偏向于应用组合的形式,子类化QObject来创立一个Worker类,增加一个QThread成员变量,将Worker对象挪动到启动后的QThread对象线程中,这样Worker信号触发的槽函数都会在QThread对象启动的线程中运行。因为QThread也是QObject派生类, 偷懒的人能够让Worker间接继承QThread而后moveToThread(this),当然这样做须要对QThread的了解更 透彻一些,所以官网也不倡议这种写法因为QThread是被设计成一个治理线程的类不应参加过多业务逻辑。

基于QThread的计划尤其要留神定时器和套接字的析构问题,确保他们在创立的线程中应用和析构,QThread的使用者会不留神的将他们在线程中创立和应用,却在QThread本人的析构函数中析构(QThread对象的析构是在他所在的线程而不是本人开启的线程)。

QRunnable

基于QRunnable的计划,将工作分解成QRunnable对象间接放入QThreadPool中执行,应用上和QThread一样咱们实现他的run()办法,然而能且只能通过QThreadPool来执行,本身不蕴含其余的性能个性,开销很小。但就是因为这样,他和外界替换数据或者线程同步就变得相当麻烦,因为QRunnable不是QObject派生类,无奈应用Qt的信号槽,不做一些解决很难优雅地将进度和后果抛出,如果用同时继承QRunnable和QObject的形式进行来增加信号槽机制不如间接应用QThread了。还有一种形式,在QRunnable对象中保留一些传入的援用或指针来做消息传递,这样数据能够通过原子 变量或互斥锁来实现更新,指针能够应用元对象零碎中的办法QMetaObject::invokeMethod()通过事件循环传出音讯,但理论使用起来仍旧麻烦,每个数据包含指针你都要思考他的跨线程竞争问题,不得不控制参数的数量,将每个工作尽可能的切割成更小的工作。

QtConcurrent

还有一种基于QtConcurrent框架的计划,是Qt的高级别线程个性,反对多种线程治理的办法, 只有把办法或者lambda表达式传入run()并指定一个线程池(默认是全局线程池QThreadPool::globalInstance())就实现了开线程执行直到返回后果的一系列流程,它会主动将任务分配到可用的处理器外围上执行,最大化利用当初外围越来越多的cpu架构。它的run()办法重载数量十分多,包含 异步和同步期待线程执行实现的办法。抉择其中的异步办法,就能够通过监控返回的QFuture对象来失去线程状态和后果,次要办法的官网形容如下:

Concurrent Map and Map-Reduce
QtConcurrent::map() applies a function to every item in a container, modifying the 
items in-place.
QtConcurrent::mapped() is like map(), except that it returns a new container with 
the modifications.
QtConcurrent::mappedReduced() is like mapped(), except that the modified results 
are reduced or folded into a single result.
Concurrent Filter and Filter-Reduce
QtConcurrent::filter() removes all items from a container based on the result of a 
filter function.
QtConcurrent::filtered() is like filter(), except that it returns a new container with 
the filtered results.
QtConcurrent::filteredReduced() is like filtered(), except that the filtered results are 
reduced or folded into a single result.
Concurrent Run
QtConcurrent::run() runs a function in another thread.

横向比拟以上计划

以上计划并没有一种是齐全优于另一种的,每种都有它适应的场景,灵活运用满足我的项目的须要是最重要的。

邮箱中的线程计划

心路历程

QtConcurrent计划中基于future的机制非常适合邮箱我的项目做前后端拆散,后端只有提供QFuture对象进来被前端监控,其余的工作包含任务调度都在后端外部实现,实现解耦。但总体上它的设计初衷更多是为了应用简略而不是功能强大,对并行处理的线程减少了映射、过滤和规约算法却减弱了对线程外部的管制和线程之间的消息传递、同步能力,而QFuture对象提供了pause()和cancel()等办法 只能影响线程池中还未执行的工作。

心愿应用QtConcurrent个性的同时还要解决一些理论问题,要可能在线程池中对不同优先级的
线程进行排序,要能把握耗时工作的实时进度并可能对其暂停和勾销,操作的影响具体到工作中操
作数据库、读写文件或者和服务器交互中的某一步,这样在局部勾销后能够做一些回滚来保障工作
的原子性。

既然Qt应用了future来命名,其实Qt曾经实现了 future-promise机制,还在本人的源码中大量的应用 。如果察看QFuture和QThreadPool的源码,时不时就会看到一个叫QFutureInterface的类, Qt的帮忙文档中不蕴含相干材料,然而别看他叫做"interface",其实他是能够间接应用的,并且领有着满足我的项目须要的办法。有趣味的同学能够浏览相干源码来理解,如果在源码中看到以下的形容不要缓和,始终追溯到Qt5的最初一个版本,这些接口也是存在并且稳固的。

计划革新

为了更好的利用这个个性,须要对它进行了革新以接地气一些,咱们通过应用QThreadPool +
QRunnable计划中的设计思路来控制线程池工作 。

首先继承QRunnable创立一个类模板用于前面衍生出各式各样的工作:

template <typename T>
class AbstractTask : public QRunnable
{
public:
QFuture<T> future();
protected:
inline void reportStarted();
inline void setProgressRange(int minimum, int maximum);
inline void setProgressValueAndText(int progressValue, const QString &progressText =
"");
inline void reportResult(const T &result, const int &index = -1);
inline void reportFinished(const T *result = 0);
virtual void run() = 0;
private:
QFutureInterface<T> d;
};

模板参数是每个工作想要对外提供的返回后果,能够只返回错误码和谬误形容用于示意执行后果,也能够增加更多的参数比方同时将下载的文件通过future返回。不必放心额定的拷贝开销,因
为另外一个让人省心的中央是,QFutureInterface曾经通过援用计数为本人实现了隐式共享。

template <typename T>
class QFutureInterface : public QFutureInterfaceBase
{
public:
QFutureInterface(State initialState = NoState)
: QFutureInterfaceBase(initialState)
{
refT();
}
QFutureInterface(const QFutureInterface &other)
: QFutureInterfaceBase(other)
{
refT();
}
~QFutureInterface()
{
if (!derefT())
resultStoreBase().template clear<T>();
}
...
}

QFutureInterface通过原子变量来实现援用计数,提供一个平台无关的原子操作,但并不是所有的处理器都反对QAtomicInt,现在国产芯片百家争鸣,如果你是非凡的架构,应用前检测一下以后处理器是否反对某个API是很重要的。应用原子变量会比应用互斥锁的形式更加简略和高效:

class RefCount
{
public:
inline RefCount(int r = 0, int rt = 0): m_refCount(r), m_refCountT(rt) {}
inline bool ref() { return m_refCount.ref(); }
inline bool deref() { return m_refCount.deref(); }
inline int load() const { return m_refCount.load(); }inline bool refT() { return m_refCountT.ref(); }
inline bool derefT() { return m_refCountT.deref(); }
inline int loadT() const { return m_refCountT.load(); }
private:
QAtomicInt m_refCount;
QAtomicInt m_refCountT;
};

QFutureInterface通过future()办法创立出的QFuture都会存有一份本人的援用实例,参加了援用计数 的计算。只有当所有的QFutureInterface对象都被析构(包含QFuture中的),他们所指向的result()后果空间才也会开释。出于灵便同一个工作是能够返回多个QFuture对象散发到不同的模块以被监控的, 但在工作实现后记得重置QFuture以开释内存,赋值为一个QFuture()即可。

template <typename T>
class QFuture
{
public:
explicit QFuture(QFutureInterface<T> *p)
: d(*p)
{ }
mutable QFutureInterface<T> d;
}
template <typename T>
class QFutureInterface : public QFutureInterfaceBase
{
inline QFuture<T> future()
{
return QFuture<T>(this);
}
}

应用案例

上图是工作流转的一个简要流程,联合这个流程上面将给出我的项目中的一个实现,一个为本人的账号创立邮件目录的工作:

class CreateMailFolderTask : public AbstractTask<ResultResponse>
{
public:
CreateMailFolderTask(QSharedPointer<ProtocolEngine> engine, QSharedPointer<ProtocolCache> cache);
void run();
void initParam(const QString &folderName);
private:
QSharedPointer<ProtocolEngine> m_engine;
QSharedPointer<ProtocolCache> m_cache;
QString m_folderName;
};
...
void CreateMailFolderTask::run()
{
setProgressRange(0, 100);
reportProgressValueAndText(0,"Started");
//do something
const ResultResponse &result = m_engine->createMailFolder(m_folderName);
reportProgressValueAndText(50,"Ongoing");
//different processing according to d.isPaused() , d.isCanceled()
if (RESULT_SUCCESS == result.type)
m_cache->createFolderId(m_folderName);//do something
reportProgressValueAndText(100,"Finished");
reportResult(result);
reportFinished();
}

首先按咱们的模板实现一个创立目录的工作,在工作的run()办法中实现相干性能,这时候就能够依据须要自在的通过多种report()办法将进度、状态形容和后果抛出,以便内部能够在每个节点获取当前任务的状态,依据是否被暂停或者被勾销等通过QFuture设置的状态来做出不同的解决,如果有必要比方在邮箱我的项目中,咱们传递了一个QAtomic原子变量到工作甚至子工作中,进行更加精准的管制。类中有两个成员变量m_engine和m_cache,这个是我的项目中用于执行邮件协定和本地缓存代码的管制类,线程平安,不做过多扩大阐明。接下来是应用:

QFuture<ResultResponse> createFolder(const QString &folderId){
CreateMailFolderTask *task = new CreateMailFolderTask(m_protocolEngine, m_protocolCache);
task->initParam(folderId);
QFuture<PrepareResponse> future = task->future();
emit sigNewTask(task);
return future;
}

咱们创立了一个工作,然而工作并不需要立即开始,而是通过信号将task抛出期待解决,能够
在适合的时候通过线程池pool->tryStart(task)来执行,能够丢在数据结构中保留下来进行优先级排序后期待唤起,还能够格式化存储到文件中保留退出期待下次利用启动后继续执行。拿到QFuture 对象的模块立即就可能进行监控,是否开始、是否完结和进度都能够通过QFuture的办法获取或应用QFutursSynchronizer组合监控,也能够通过QFutureWatcher监控QFuture实现被动解决,这个具体看看官网阐明即可:

QFuture represents the result of an asynchronous computation.
QFutureIterator allows iterating through results available via QFuture.
QFutureWatcher allows monitoring a QFuture using signals-and-slots.
QFutureSynchronizer is a convenience class that automatically synchronizes several
QFutures.
QFutureWatcher<ResultResponse> watcher;
watcher.setFuture(future);
QObject::connect(&watcher, &QFutureWatcher<ResultResponse>::progressValueChanged,
= {
progressValue;//do something});

工作的返回能够通过QFuture的result()办法获取,如果是逐渐抛出后果的批处理工作,能够通过results()或者resultAt(int index)办法获取已获得的后果列表。result()的提前调用不会产生谬误,它会 阻塞以后的线程,期待工作实现后失去后果才会持续向下执行,也能够被动调用waitForFinished() 办法阻塞期待工作实现达到一样的成果。阻塞期待能够不必为了耗时极短的工作去写监控代码,也为写单元测试代码带来了十分大的便利性:

#include <gtest/gtest.h>
TEST_F(ut_session, createFolder){
QFuture<ResultResponse> future = session->createFolder("MyBox");
future.waitForFinished();
EXPECT_TRUE(ResultCode::Success == future.result().code);
}

小结

总结一下,Qt future-promise联合QRunnable的计划非常灵便,其实还有很多个性没有在此演
示,咱们曾经将它落地在邮箱我的项目中,接口稳固运行,获得了不错的成果。