共计 5333 个字符,预计需要花费 14 分钟才能阅读完成。
前言
在文章《webmagic 核心设计和运行机制分析》中已经提到 WebMagic 内部是通过生产者 / 消费者模式来实现的,本篇我们就分析一下 WebMagic 的源代码,先从爬虫入口类 main 方法开始。
爬虫入口类 main 方法
public static void main(String[] args) {Spider.create(new GithubRepoPageProcessor())
// 从 https://github.com/code4craft 开始抓
.addUrl("https://github.com/code4craft")
// 设置 Scheduler,使用 Redis 来管理 URL 队列
.setScheduler(new RedisScheduler("localhost"))
// 设置 Pipeline,将结果以 json 方式保存到文件
.addPipeline(new JsonFilePipeline("D:\\data\\webmagic"))
// 开启 5 个线程同时执行
.thread(5)
// 启动爬虫
.run();}
通过官方给出的创建爬虫入口类的样例代码可以看到,启动爬虫是调用 Spider.run()
方法。
Spider 类源码分析
1. Spider.run()
方法
checkRunningStat()
检查运行状态,不是很重要,跳过。
@Override
public void run() {
// 检查运行状态
checkRunningStat();
// 初始化组件
initComponent();
logger.info("Spider {} started!",getUUID());
// 死循环从 Scheduler 中拉取 Request(Request 中封装了 url)while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {final Request request = scheduler.poll(this);
if (request == null) {if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {break;}
// wait until new url added
// 当 Scheduler 中不存在 Request 时,线程等待
waitNewUrl();} else {threadPool.execute(new Runnable() {
@Override
public void run() {
try {
// 处理 Request,核心方法
processRequest(request);
// 调用监听器 onSuccess()方法
onSuccess(request);
} catch (Exception e) {// 调用监听器 onError()方法
onError(request);
logger.error("process request" + request + "error", e);
} finally {pageCount.incrementAndGet();
// 唤醒线程
signalNewUrl();}
}
});
}
}
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {close();
}
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}
2. initComponent()
初始化组件:设置默认 Downloader 实现,初始化线程池
// 初始化组件
protected void initComponent() {if (downloader == null) {
// 默认使用 HttpClientDownloader
this.downloader = new HttpClientDownloader();}
if (pipelines.isEmpty()) {pipelines.add(new ConsolePipeline());
}
downloader.setThread(threadNum);
// 初始化线程池
if (threadPool == null || threadPool.isShutdown()) {if (executorService != null && !executorService.isShutdown()) {threadPool = new CountableThreadPool(threadNum, executorService);
} else {threadPool = new CountableThreadPool(threadNum);
}
}
if (startRequests != null) {for (Request request : startRequests) {addRequest(request);
}
startRequests.clear();}
startTime = new Date();}
3. waitNewUrl()
/ signalNewUrl()
:配合 Scheduler 对象实现生产者 / 消费者模式
// 线程等待
private void waitNewUrl() {newUrlLock.lock();
try {
//double check
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {return;}
newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {logger.warn("waitNewUrl - interrupted, error {}", e);
} finally {newUrlLock.unlock();
}
}
// 线程唤醒
private void signalNewUrl() {
try {newUrlLock.lock();
newUrlCondition.signalAll();} finally {newUrlLock.unlock();
}
}
在 Spider 类属性中包含了默认 Scheduler 实现类 QueueScheduler 的对象 scheduler,而在 QueueScheduler 类中默认使用内存阻塞队列来存储 Request。
// Spider 类属性
protected Scheduler scheduler = new QueueScheduler();
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
// 默认使用内存阻塞队列来存储 Request 对象
private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
@Override
public void pushWhenNoDuplicate(Request request, Task task) {queue.add(request);
}
@Override
public Request poll(Task task) {return queue.poll();
}
@Override
public int getLeftRequestsCount(Task task) {return queue.size();
}
@Override
public int getTotalRequestsCount(Task task) {return getDuplicateRemover().getTotalRequestsCount(task);
}
}
看到这里,有人可能会疑惑当阻塞队列中没有 Request 对象时,线程会卡死在 Spider.run()
方法中 waitNewUrl()
上。
其实只要我们看 Spider.addUrl()
和Spider.addRequest()
两个方法的源码,就会发现其中调用了线程唤醒方法 Spider.signalNewUrl()
,而在爬虫入口类中必然会调addUrl()
或addRequest()
其中一个来设置起始 url,所以不存在线程运行卡死的情况。
public Spider addUrl(String... urls) {for (String url : urls) {addRequest(new Request(url));
}
// 调用线程唤醒方法
signalNewUrl();
return this;
}
public Spider addRequest(Request... requests) {for (Request request : requests) {addRequest(request);
}
// 调用线程唤醒方法
signalNewUrl();
return this;
}
private void addRequest(Request request) {if (site.getDomain() == null && request != null && request.getUrl() != null) {site.setDomain(UrlUtils.getDomain(request.getUrl()));
}
// 将 request 推送到 scheduler 内存阻塞队列中去
scheduler.push(request, this);
}
这样就构成了一个典型的生产者 / 消费者模式代码实现。
4. processRequest()
:爬虫业务逻辑的核心方法
首先调用 Downloader 下载网页,再调用自定义的 PageProcessor 解析网页文本并从中提取出目标数据,最后调用自定义的 Pipeline 持久化目标数据。
private void processRequest(Request request) {
// 调用 Downloader 对象下载网页
Page page = downloader.download(request, this);
if (page.isDownloadSuccess()){
// 下载成功
onDownloadSuccess(request, page);
} else {onDownloaderFail(request);
}
}
private void onDownloadSuccess(Request request, Page page) {if (site.getAcceptStatCode().contains(page.getStatusCode())){
// 调用自定义的 PageProcessor 对象(通过入口类设置)解析封装网页的 Page 对象
pageProcessor.process(page);
// 添加后续需要爬取的 url 字符串,推送 Request 到 Scheduler 中去
extractAndAddRequests(page, spawnUrl);
if (!page.getResultItems().isSkip()) {for (Pipeline pipeline : pipelines) {
// 调用自定义的 Pipeline 对象(通过入口类设置)持久化目标数据,通过 ResultItems 对象封装传递
pipeline.process(page.getResultItems(), this);
}
}
} else {logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
sleep(site.getSleepTime());
return;
}
private void onDownloaderFail(Request request) {if (site.getCycleRetryTimes() == 0) {sleep(site.getSleepTime());
} else {
// for cycle retry
// 下载失败后重试
doCycleRetry(request);
}
}
protected void extractAndAddRequests(Page page, boolean spawnUrl) {
// 添加 target Request
if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {for (Request request : page.getTargetRequests()) {
// 推送 Request 到 Scheduler 内存阻塞队列中去
addRequest(request);
}
}
}
5. onSuccess()
/ onError()
:监听 Request 处理成功或失败的情况
只有需要自定义 SpiderListener
监听器时才会使用到,这里不是重点,不再赘述,具体使用方法可参考源码 https://github.com/xiawq87/sp… 中的实现。
小结
通过上述源码分析的过程,让我们可以大体了解 WebMagic 内部生产者 / 消费者模式的实现方式。