共计 7493 个字符,预计需要花费 19 分钟才能阅读完成。
前言
在上一篇文章《Golang 实现简单爬虫框架(4)——队列实现并发任务调度》中,我们使用用队列实现了任务调度,接下来首先对两种并发方式做一个同构,使代码统一。然后添加数据存储模块。
注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从 github 下载项目源代码查看
1、项目重构
(1)并发引擎
通过分析我们发现,两种不同调度的区别是每个 worker
一个 channel
还是 所有 worker
共用一个 channel
,所以我们在接口中定义一个函数WorkerChan()
,用来决定这件事,即worker
一个 channel
还是 所有 worker
共用一个 channel
。此时ConfigMasterWorkerChan
就不再需要了。
在项目文件 concurrent.go 中我们定义一个任务调度器 Scheduler,如下:
// 任务调度器 | |
type Scheduler interface {Submit(request Request) // 提交任务 | |
ConfigMasterWorkerChan(chan Request) | |
WorkerReady(w chan Request) | |
Run()} |
但是在简单并发中我们只实现了 Submit
和ConfigMasterWorkerChan
接口,而使用队列调度中却实现了接口的所有方法,所有我们同构一下使 concurrent.go
文件可以适用于两种不同的调度器。
因为在 createworker
函数中要使用 WorkerReady
函数,所以要传入一个 Scheduler
,但是这样显得比较重,我们可以利用接口组合,新建一个接口ReadyNotifier
,这样在createworker
函数中传入 ReadyNotifier
即可。
修改后的任务调度如下:
type Scheduler interface { | |
ReadyNotifier | |
Submit(request Request) // 提交任务 | |
WorkerChan() chan Request | |
Run()} | |
type ReadyNotifier interface {WorkerReady(chan Request) | |
} |
此时创建 goroutine 修改如下:
// 创建 goroutine | |
for i := 0; i < e.WorkerCount; i++ { | |
// 任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由 WorkerChan 来决定 | |
createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler) | |
} |
修改后的 concurrent.go 文件如下:
package engine | |
import ("log") | |
// 并发引擎 | |
type ConcurrendEngine struct { | |
Scheduler Scheduler | |
WorkerCount int | |
} | |
// 任务调度器 | |
type Scheduler interface { | |
ReadyNotifier | |
Submit(request Request) // 提交任务 | |
WorkerChan() chan Request | |
Run()} | |
type ReadyNotifier interface {WorkerReady(chan Request) | |
} | |
func (e *ConcurrendEngine) Run(seeds ...Request) {out := make(chan ParseResult) | |
e.Scheduler.Run() | |
// 创建 goruntine | |
for i := 0; i < e.WorkerCount; i++ { | |
// 任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由 WorkerChan 来决定 | |
createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler) | |
} | |
// engine 把请求任务提交给 Scheduler | |
for _, request := range seeds {e.Scheduler.Submit(request) | |
} | |
itemCount := 0 | |
for { | |
// 接受 Worker 的解析结果 | |
result := <-out | |
for _, item := range result.Items {log.Printf("Got item: #%d: %v\n", itemCount, item) | |
itemCount++ | |
} | |
// 然后把 Worker 解析出的 Request 送给 Scheduler | |
for _, request := range result.Requests {e.Scheduler.Submit(request) | |
} | |
} | |
} | |
func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) {go func() { | |
for {ready.WorkerReady(in) // 告诉调度器任务空闲 | |
request := <-in | |
result, err := worker(request) | |
if err != nil {continue} | |
out <- result | |
} | |
}()} |
(2)简单并发调度器
scheduler/simple.go
package scheduler | |
import "crawler/engine" | |
type SimpleScheduler struct {workerChan chan engine.Request} | |
func (s *SimpleScheduler) WorkerChan() chan engine.Request { | |
// 此时所有 worker 共用同一个 channel,直接返回即可 | |
return s.workerChan | |
} | |
func (s *SimpleScheduler) WorkerReady(w chan engine.Request) { | |
} | |
func (s *SimpleScheduler) Run() { | |
// 创建出 workchannel | |
s.workerChan = make(chan engine.Request) | |
} | |
func (s *SimpleScheduler) Submit(request engine.Request) { | |
// send request down to worker chan | |
go func() {s.workerChan <- request}()} |
(3)队列实现调度器
scheduler/queued.go
添加 WorkerChan()
的实现即可
package scheduler | |
import "crawler/engine" | |
// 使用队列来调度任务 | |
type QueuedScheduler struct { | |
requestChan chan engine.Request | |
workerChan chan chan engine.Request | |
} | |
func (s *QueuedScheduler) WorkerChan() chan engine.Request { | |
// 对于队列实现来讲,每个 worker 共用一个 channel | |
return make(chan engine.Request) | |
} | |
// 提交请求任务到 requestChan | |
func (s *QueuedScheduler) Submit(request engine.Request) {s.requestChan <- request} | |
// 告诉外界有一个 worker 可以接收 request | |
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {s.workerChan <- w} | |
func (s *QueuedScheduler) Run() {s.workerChan = make(chan chan engine.Request) | |
s.requestChan = make(chan engine.Request) | |
go func() { | |
// 创建请求队列和工作队列 | |
var requestQ []engine.Request | |
var workerQ []chan engine.Request | |
for { | |
var activeWorker chan engine.Request | |
var activeRequest engine.Request | |
if len(requestQ) > 0 && len(workerQ) > 0 {activeWorker = workerQ[0] | |
activeRequest = requestQ[0] | |
} | |
select { | |
case r := <-s.requestChan: // 当 requestChan 收到数据 | |
requestQ = append(requestQ, r) | |
case w := <-s.workerChan: // 当 workerChan 收到数据 | |
workerQ = append(workerQ, w) | |
case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务 | |
requestQ = requestQ[1:] | |
workerQ = workerQ[1:] | |
} | |
} | |
}()} |
(4)main 函数
经过上述同构,在 main 函数中如需切换不同调度器,只需要相应的配置即可。
package main | |
import ( | |
"crawler/engine" | |
"crawler/scheduler" | |
"crawler/zhenai/parser" | |
) | |
func main() { | |
e := engine.ConcurrendEngine{//Scheduler: &scheduler.QueuedScheduler{}, // 队列实现调度器 | |
Scheduler: &scheduler.SimpleScheduler{}, // 简单并发调度 | |
WorkerCount: 50, | |
} | |
e.Run(engine.Request{ | |
Url: "http://www.zhenai.com/zhenghun", | |
ParseFunc: parser.ParseCityList, | |
}) | |
} |
2、数据存储
(1)Mgo 的介绍安装
爬取到的数据不能仅仅在控制台打印出来,所以我们还要给爬虫添加数据存储模块。我们本次选择使用 mongodb 来存储我们的数据。
mgo(音 mango)是 MongoDB 的 Go 语言驱动,它用基于 Go 语法的简单 API 实现了丰富的特性,并经过良好测试。
官方网址:http://labix.org/mgo
文档:API docs for mgo
首先我们要安装 mgo,打开终端,输入下面代码完成安装
go get gopkg.in/mgo.v2
mgo 基本操作都很简单,有数据库操作经验都可以很快上手。
(2)爬虫引擎与数据格式
首先,爬虫引擎获取到数据要把数据发送给数据存储模块,而数据的传递用要用到 channel
,所以打开concurrent.go
文件,在引擎添加 ItemChan
属性,如下所示:
爬取到数据需要把数据发送到数据存储模块,
package engine | |
// 并发引擎 | |
type ConcurrendEngine struct { | |
Scheduler Scheduler // 任务调度器 | |
WorkerCount int // 并发任务数量 | |
ItemChan chan Item // 数据保存 channel | |
} | |
// ... | |
for { | |
// 接受 Worker 的解析结果 | |
result := <-out | |
for _, item := range result.Items { | |
// 当抓取一组数据后,进行保存 | |
go func(item2 Item) {e.ItemChan <- item2}(item) | |
} | |
// ... | |
} | |
// ... |
在 engine/types.go
中定义 Item 类型:
package engine | |
// 请求结构 | |
type Request struct { | |
Url string // 请求地址 | |
ParseFunc func([]byte) ParseResult | |
} | |
// 解析结果结构 | |
type ParseResult struct {Requests []Request // 解析出的请求 | |
Items []Item // 解析出的内容} | |
// 解析出的用户数据格式 | |
type Item struct { | |
Url string // 个人信息 Url 地址 | |
Type string // table | |
Id string // Id | |
Payload interface{} // 详细信息} | |
func NilParseFun([]byte) ParseResult {return ParseResult{} | |
} |
(3)存储模块的实现
在根目录下创建 persist 文件夹,然后创建 itemsaver.go 文件
// persist/itemsaver.go | |
package persist | |
import ( | |
"context" | |
"crawler/engine" | |
"errors" | |
"gopkg.in/mgo.v2" | |
"gopkg.in/olivere/elastic.v5" | |
"log" | |
) | |
func ItemSaver(index string) (chan engine.Item, error) { | |
// mongodb connect | |
session, err := mgo.Dial("localhost:27017") | |
if err != nil {panic(err) | |
} | |
out := make(chan engine.Item) | |
go func() { | |
itemCount := 0 | |
for { | |
// 接收到发送的 item | |
item := <-out | |
log.Printf("Item Saver: got item #%d: %v\n", | |
itemCount, item) | |
itemCount++ | |
// Save data in mongodb | |
err := mongo_save(session, index, item) | |
if err != nil { | |
// if have err, ignore it | |
log.Printf("Item Saver: error, saving item %v: %v", | |
item, err) | |
} | |
} | |
}() | |
return out, nil | |
} | |
// 使用 MongoDB 保存数据 | |
func mongo_save(session *mgo.Session, dbName string, item engine.Item) error { | |
if item.Type == "" {return errors.New("must supply Type") | |
} | |
c := session.DB(dbName).C(item.Type) // 选择要操作的数据库与集合 | |
err := c.Insert(item) // 插入数据 | |
if err != nil {log.Fatal(err) | |
} | |
return nil | |
} |
(4)存储测试文件
我们把一条数据存入 mongodb,然后再取出来,比对读出的数据和写入的数据是否相同
// persist/itemsaver_test.gp | |
package persist | |
import ( | |
"crawler/engine" | |
"crawler/model" | |
"encoding/json" | |
"fmt" | |
"gopkg.in/mgo.v2" | |
"gopkg.in/mgo.v2/bson" | |
"log" | |
"testing" | |
) | |
func TestMongoSave(t *testing.T) { | |
// mongodb connect | |
session, err := mgo.Dial("localhost:27017") | |
if err != nil {panic(err) | |
} | |
expected := engine.Item{ | |
Url: "http://album.zhenai.com/u/1946858930", | |
Type: "zhenai", | |
Id: "1946858930", | |
Payload: model.Profile{ | |
Name: "為你垨候", | |
Gender: "女士", | |
Age: 40, | |
Height: 163, | |
Weight: 54, | |
Income: "5- 8 千", | |
Marriage: "未婚", | |
Address: "佛山顺德区", | |
}, | |
} | |
// 保存数据 | |
err = mongo_save(session, "crawler", expected) | |
if err != nil {panic(err) | |
} | |
c := session.DB("crawler").C("zhenai") | |
var result engine.Item | |
// 查询数据 | |
err = c.Find(bson.M{"id": "1946858930"}).One(&result) | |
// result 为 Json 类型 | |
if err != nil {log.Fatal(err) | |
} | |
fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload) | |
} |
(5)parser 模块
我们要在 parse/profile.go
文件中组装好需要保存到数据库的数据格式
// ... | |
result := engine.ParseResult{Items: []engine.Item{ | |
{ | |
Url: url, | |
Type: "zhenai", | |
Id: extractString([]byte(url), idUrlRe), | |
Payload: profile, | |
}, | |
}, | |
} | |
// ... |
(6)main 函数
package main | |
import ( | |
"crawler/engine" | |
"crawler/persist" | |
"crawler/scheduler" | |
"crawler/zhenai/parser" | |
) | |
func main() {itemChan, err := persist.ItemSaver() | |
if err != nil {panic(err) | |
} | |
e := engine.ConcurrendEngine{//Scheduler: &scheduler.QueuedScheduler{}, | |
Scheduler: &scheduler.SimpleScheduler{}, | |
WorkerCount: 100, | |
ItemChan: itemChan, | |
} | |
e.Run(engine.Request{ | |
Url: "http://www.zhenai.com/zhenghun", | |
ParseFunc: parser.ParseCityList, | |
}) | |
} |
运行项目,打开 mongodb 可视化工具,可以看到爬取了 54410 条数据
3、总结
我们首先把两种并发方式做一个同构,使代码统一,直接在 main 函数中使用不同的配置就可以切换调度器,简单方便。然后使用 Mgo 驱动操作数据,添加到 mongodb 中。内容有点多,很多代码没有完整的展示出来,希望大家可以下载项目源代码,回滚到对应提交记录查看,效果会更好。别无所求,只求随手给个 star
下篇博客中我们会再当前博客的基础上添加数据展示功能
如果想获取 Google 工程师深度讲解 go 语言视频资源的,可以在评论区留下邮箱。
如果觉得文章还可以,劳烦大人随手点个赞。。。