关于后端:邮箱中的Qt线程设计

8次阅读

共计 7417 个字符,预计需要花费 19 分钟才能阅读完成。

邮箱(deepin-mail)次要应用 Qt 框架开发,是一个有大量并行任务且须要监控进度和后果的我的项目,工作的优先级调整和反对勾销回滚也是必不可少。Qt 曾经为咱们提供了多种线程设计的办法,能够满足绝大部分应用场景,然而每一种计划单拎进去都不能很恰到好处的在我的项目中应用,本文次要对我的项目中的线程设计进行介绍,通过将多个 Qt 线程计划相结合,衍生出一种适宜邮箱这类型我的项目的利用办法。

浅谈 Qt 中的线程办法

QThread

基于 QThread 的计划,须要子类化 QThread 实现 run()办法,并在适合的时候调用 start(),在这之前须要做好数据交换,并在线程执行过程中通过跨线程的 connect()办法将数据传出,这里须要留神不能将连贯参数设置为 Qt::DirectConnection,因为线程的数据须要通过事件循环传递来保障平安,在跨线程的 connect()办法中参数即便应用了援用 Qt 也仍然会在 emit 时帮你额定触发一次参数拷贝,影响性能。

如果不是开销十分大的工作像这样间接继承其实不受举荐,当初更偏向于应用组合的形式,子类化 QObject 来创立一个 Worker 类,增加一个 QThread 成员变量,将 Worker 对象挪动到启动后的 QThread 对象线程中,这样 Worker 信号触发的槽函数都会在 QThread 对象启动的线程中运行。因为 QThread 也是 QObject 派生类,偷懒的人能够让 Worker 间接继承 QThread 而后 moveToThread(this),当然这样做须要对 QThread 的了解更 透彻一些,所以官网也不倡议这种写法因为 QThread 是被设计成一个治理线程的类不应参加过多业务逻辑。

基于 QThread 的计划尤其要留神定时器和套接字的析构问题,确保他们在创立的线程中应用和析构,QThread 的使用者会不留神的将他们在线程中创立和应用,却在 QThread 本人的析构函数中析构(QThread 对象的析构是在他所在的线程而不是本人开启的线程)。

QRunnable

基于 QRunnable 的计划,将工作分解成 QRunnable 对象间接放入 QThreadPool 中执行,应用上和 QThread 一样咱们实现他的 run()办法,然而能且只能通过 QThreadPool 来执行,本身不蕴含其余的性能个性,开销很小。但就是因为这样,他和外界替换数据或者线程同步就变得相当麻烦,因为 QRunnable 不是 QObject 派生类,无奈应用 Qt 的信号槽,不做一些解决很难优雅地将进度和后果抛出,如果用同时继承 QRunnable 和 QObject 的形式进行来增加信号槽机制不如间接应用 QThread 了。还有一种形式,在 QRunnable 对象中保留一些传入的援用或指针来做消息传递,这样数据能够通过原子 变量或互斥锁来实现更新,指针能够应用元对象零碎中的办法 QMetaObject::invokeMethod()通过事件循环传出音讯,但理论使用起来仍旧麻烦,每个数据包含指针你都要思考他的跨线程竞争问题,不得不控制参数的数量,将每个工作尽可能的切割成更小的工作。

QtConcurrent

还有一种基于 QtConcurrent 框架的计划,是 Qt 的高级别线程个性,反对多种线程治理的办法,只有把办法或者 lambda 表达式传入 run()并指定一个线程池 (默认是全局线程池 QThreadPool::globalInstance()) 就实现了开线程执行直到返回后果的一系列流程,它会主动将任务分配到可用的处理器外围上执行,最大化利用当初外围越来越多的 cpu 架构。它的 run()办法重载数量十分多,包含 异步和同步期待线程执行实现的办法。抉择其中的异步办法,就能够通过监控返回的 QFuture 对象来失去线程状态和后果,次要办法的官网形容如下:

Concurrent Map and Map-Reduce
QtConcurrent::map() applies a function to every item in a container, modifying the 
items in-place.
QtConcurrent::mapped() is like map(), except that it returns a new container with 
the modifications.
QtConcurrent::mappedReduced() is like mapped(), except that the modified results 
are reduced or folded into a single result.
Concurrent Filter and Filter-Reduce
QtConcurrent::filter() removes all items from a container based on the result of a 
filter function.
QtConcurrent::filtered() is like filter(), except that it returns a new container with 
the filtered results.
QtConcurrent::filteredReduced() is like filtered(), except that the filtered results are 
reduced or folded into a single result.
Concurrent Run
QtConcurrent::run() runs a function in another thread.

横向比拟以上计划

以上计划并没有一种是齐全优于另一种的,每种都有它适应的场景,灵活运用满足我的项目的须要是最重要的。

邮箱中的线程计划

心路历程

QtConcurrent 计划中基于 future 的机制非常适合邮箱我的项目做前后端拆散,后端只有提供 QFuture 对象进来被前端监控,其余的工作包含任务调度都在后端外部实现,实现解耦。但总体上它的设计初衷更多是为了应用简略而不是功能强大,对并行处理的线程减少了映射、过滤和规约算法却减弱了对线程外部的管制和线程之间的消息传递、同步能力,而 QFuture 对象提供了 pause()和 cancel()等办法 只能影响线程池中还未执行的工作。

心愿应用 QtConcurrent 个性的同时还要解决一些理论问题,要可能在线程池中对不同优先级的
线程进行排序,要能把握耗时工作的实时进度并可能对其暂停和勾销,操作的影响具体到工作中操
作数据库、读写文件或者和服务器交互中的某一步,这样在局部勾销后能够做一些回滚来保障工作
的原子性。

既然 Qt 应用了 future 来命名,其实 Qt 曾经实现了 future-promise 机制,还在本人的源码中大量的应用。如果察看 QFuture 和 QThreadPool 的源码,时不时就会看到一个叫 QFutureInterface 的类,Qt 的帮忙文档中不蕴含相干材料,然而别看他叫做 ”interface”,其实他是能够间接应用的,并且领有着满足我的项目须要的办法。有趣味的同学能够浏览相干源码来理解,如果在源码中看到以下的形容不要缓和,始终追溯到 Qt5 的最初一个版本,这些接口也是存在并且稳固的。

计划革新

为了更好的利用这个个性,须要对它进行了革新以接地气一些,咱们通过应用 QThreadPool +
QRunnable 计划中的设计思路来控制线程池工作。

首先继承 QRunnable 创立一个类模板用于前面衍生出各式各样的工作:

template <typename T>
class AbstractTask : public QRunnable
{
public:
QFuture<T> future();
protected:
inline void reportStarted();
inline void setProgressRange(int minimum, int maximum);
inline void setProgressValueAndText(int progressValue, const QString &progressText =
“”);
inline void reportResult(const T &result, const int &index = -1);
inline void reportFinished(const T *result = 0);
virtual void run() = 0;
private:
QFutureInterface<T> d;
};

模板参数是每个工作想要对外提供的返回后果,能够只返回错误码和谬误形容用于示意执行后果,也能够增加更多的参数比方同时将下载的文件通过 future 返回。不必放心额定的拷贝开销,因
为另外一个让人省心的中央是,QFutureInterface 曾经通过援用计数为本人实现了隐式共享。

template <typename T>
class QFutureInterface : public QFutureInterfaceBase
{
public:
QFutureInterface(State initialState = NoState)
: QFutureInterfaceBase(initialState)
{
refT();
}
QFutureInterface(const QFutureInterface &other)
: QFutureInterfaceBase(other)
{
refT();
}
~QFutureInterface()
{
if (!derefT())
resultStoreBase().template clear<T>();
}

}

QFutureInterface 通过原子变量来实现援用计数,提供一个平台无关的原子操作,但并不是所有的处理器都反对 QAtomicInt,现在国产芯片百家争鸣,如果你是非凡的架构,应用前检测一下以后处理器是否反对某个 API 是很重要的。应用原子变量会比应用互斥锁的形式更加简略和高效:

class RefCount
{
public:
inline RefCount(int r = 0, int rt = 0): m_refCount(r), m_refCountT(rt) {}
inline bool ref() { return m_refCount.ref(); }
inline bool deref() { return m_refCount.deref(); }
inline int load() const { return m_refCount.load(); }inline bool refT() { return m_refCountT.ref(); }
inline bool derefT() { return m_refCountT.deref(); }
inline int loadT() const { return m_refCountT.load(); }
private:
QAtomicInt m_refCount;
QAtomicInt m_refCountT;
};

QFutureInterface 通过 future()办法创立出的 QFuture 都会存有一份本人的援用实例,参加了援用计数 的计算。只有当所有的 QFutureInterface 对象都被析构 (包含 QFuture 中的),他们所指向的 result() 后果空间才也会开释。出于灵便同一个工作是能够返回多个 QFuture 对象散发到不同的模块以被监控的,但在工作实现后记得重置 QFuture 以开释内存,赋值为一个 QFuture()即可。

template <typename T>
class QFuture
{
public:
explicit QFuture(QFutureInterface<T> *p)
: d(*p)
{}
mutable QFutureInterface<T> d;
}
template <typename T>
class QFutureInterface : public QFutureInterfaceBase
{
inline QFuture<T> future()
{
return QFuture<T>(this);
}
}

应用案例

上图是工作流转的一个简要流程,联合这个流程上面将给出我的项目中的一个实现,一个为本人的账号创立邮件目录的工作:

class CreateMailFolderTask : public AbstractTask<ResultResponse>
{
public:
CreateMailFolderTask(QSharedPointer<ProtocolEngine> engine, QSharedPointer<ProtocolCache> cache);
void run();
void initParam(const QString &folderName);
private:
QSharedPointer<ProtocolEngine> m_engine;
QSharedPointer<ProtocolCache> m_cache;
QString m_folderName;
};

void CreateMailFolderTask::run()
{
setProgressRange(0, 100);
reportProgressValueAndText(0,”Started”);
//do something
const ResultResponse &result = m_engine->createMailFolder(m_folderName);
reportProgressValueAndText(50,”Ongoing”);
//different processing according to d.isPaused() , d.isCanceled()
if (RESULT_SUCCESS == result.type)
m_cache->createFolderId(m_folderName);//do something
reportProgressValueAndText(100,”Finished”);
reportResult(result);
reportFinished();
}

首先按咱们的模板实现一个创立目录的工作,在工作的 run()办法中实现相干性能,这时候就能够依据须要自在的通过多种 report()办法将进度、状态形容和后果抛出,以便内部能够在每个节点获取当前任务的状态,依据是否被暂停或者被勾销等通过 QFuture 设置的状态来做出不同的解决,如果有必要比方在邮箱我的项目中,咱们传递了一个 QAtomic 原子变量到工作甚至子工作中,进行更加精准的管制。类中有两个成员变量 m_engine 和 m_cache,这个是我的项目中用于执行邮件协定和本地缓存代码的管制类,线程平安,不做过多扩大阐明。接下来是应用:

QFuture<ResultResponse> createFolder(const QString &folderId){
CreateMailFolderTask *task = new CreateMailFolderTask(m_protocolEngine, m_protocolCache);
task->initParam(folderId);
QFuture<PrepareResponse> future = task->future();
emit sigNewTask(task);
return future;
}

咱们创立了一个工作,然而工作并不需要立即开始,而是通过信号将 task 抛出期待解决,能够
在适合的时候通过线程池 pool->tryStart(task)来执行,能够丢在数据结构中保留下来进行优先级排序后期待唤起,还能够格式化存储到文件中保留退出期待下次利用启动后继续执行。拿到 QFuture 对象的模块立即就可能进行监控,是否开始、是否完结和进度都能够通过 QFuture 的办法获取或应用 QFutursSynchronizer 组合监控,也能够通过 QFutureWatcher 监控 QFuture 实现被动解决,这个具体看看官网阐明即可:

QFuture represents the result of an asynchronous computation.
QFutureIterator allows iterating through results available via QFuture.
QFutureWatcher allows monitoring a QFuture using signals-and-slots.
QFutureSynchronizer is a convenience class that automatically synchronizes several
QFutures.
QFutureWatcher<ResultResponse> watcher;
watcher.setFuture(future);
QObject::connect(&watcher, &QFutureWatcher<ResultResponse>::progressValueChanged,
= {
progressValue;//do something});

工作的返回能够通过 QFuture 的 result()办法获取,如果是逐渐抛出后果的批处理工作,能够通过 results()或者 resultAt(int index)办法获取已获得的后果列表。result()的提前调用不会产生谬误,它会 阻塞以后的线程,期待工作实现后失去后果才会持续向下执行,也能够被动调用 waitForFinished() 办法阻塞期待工作实现达到一样的成果。阻塞期待能够不必为了耗时极短的工作去写监控代码,也为写单元测试代码带来了十分大的便利性:

#include <gtest/gtest.h>
TEST_F(ut_session, createFolder){
QFuture<ResultResponse> future = session->createFolder(“MyBox”);
future.waitForFinished();
EXPECT_TRUE(ResultCode::Success == future.result().code);
}

小结

总结一下,Qt future-promise 联合 QRunnable 的计划非常灵便,其实还有很多个性没有在此演
示,咱们曾经将它落地在邮箱我的项目中,接口稳固运行,获得了不错的成果。

正文完
 0