1、环境筹备
php7/8+ swoole4.6+
2、示例代码
Coroutine\run(function () use ($db_work, $db_process) { $connPool = new PDOPool((new PDOConfig) ->withHost($db_work['host']) ->withPort(3306) // ->withUnixSocket('/tmp/mysql.sock') ->withDbName($db_work['database']) ->withCharset('utf8mb4') ->withUsername($db_work['user']) ->withPassword($db_work['password']) ); $processPool = new PDOPool((new PDOConfig) ->withHost($db_process['host']) ->withPort(3306) // ->withUnixSocket('/tmp/mysql.sock') ->withDbName($db_process['database']) ->withCharset('utf8mb4') ->withUsername($db_process['user']) ->withPassword($db_process['password']) ); $channel = new Channel(60); $conn = $connPool->get(); $data1 = $conn->query('SELECT count(*) FROM `goods_images` WHERE image_url!=\'\'')->fetchAll(PDO::FETCH_NUM); $data2 = $conn->query('SELECT count(*) FROM `goods_sku` WHERE original_img!=\'\' AND is_delete=0')->fetchAll(PDO::FETCH_NUM); $data3 = $conn->query('SELECT count(*) FROM `goods_spu` WHERE original_img!=\'\' AND is_delete=0')->fetchAll(PDO::FETCH_NUM); $connPool->put($conn); $goods_images_total = $data1[0][0]; $goods_sku_total = $data2[0][0]; $goods_spu_total = $data3[0][0]; $todo = [ 'goods_images' => $goods_images_total, 'goods_sku' => $goods_sku_total, 'goods_spu' => $goods_spu_total, ]; print_r($todo); Coroutine::create(function () use ($channel, $todo) { foreach ($todo as $table => $total) { $page = 0; //第一页开始 while ($page++ <= $total) { $channel->push(['page' => $page, 'table' => $table]); } } }); Coroutine::create(function () use ($channel, $connPool, $processPool) { while (1) { $data = $channel->pop(10); if ($data) { $page = $data['page']; $table = $data['table']; // todo echo($page . ' <== task执行结束 ok' . PHP_EOL); } else { if ($channel->isEmpty()) { echo 'images $channel:empty' . PHP_EOL; break; } } } });});
a.剖析建设能够独立运行的最小单位cell
两头异步瓶颈mysql也应用连接池,连接池容量应该大于协程工作池容量;应用mysql连接池留神“有借有还”,$conn = $connPool->get();$connPool->put($conn);
b.队列化需要参数,应用channel.push发送参数
减速执行如秒杀解决,该当应用队列。
c.应用channel.pop接管参数,供应cell执行
协程池增加-取用
保持一致