乐趣区

webmagic源码分析

前言

在文章《webmagic 核心设计和运行机制分析》中已经提到 WebMagic 内部是通过生产者 / 消费者模式来实现的,本篇我们就分析一下 WebMagic 的源代码,先从爬虫入口类 main 方法开始。

爬虫入口类 main 方法

public static void main(String[] args) {Spider.create(new GithubRepoPageProcessor())
            // 从 https://github.com/code4craft 开始抓    
            .addUrl("https://github.com/code4craft")
            // 设置 Scheduler,使用 Redis 来管理 URL 队列
            .setScheduler(new RedisScheduler("localhost"))
            // 设置 Pipeline,将结果以 json 方式保存到文件
            .addPipeline(new JsonFilePipeline("D:\\data\\webmagic"))
            // 开启 5 个线程同时执行
            .thread(5)
            // 启动爬虫
            .run();}

通过官方给出的创建爬虫入口类的样例代码可以看到,启动爬虫是调用 Spider.run() 方法。

Spider 类源码分析

1. Spider.run()方法

checkRunningStat()检查运行状态,不是很重要,跳过。

@Override
public void run() {
    // 检查运行状态
    checkRunningStat();
    // 初始化组件
    initComponent();
    logger.info("Spider {} started!",getUUID());
    // 死循环从 Scheduler 中拉取 Request(Request 中封装了 url)while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {final Request request = scheduler.poll(this);
        if (request == null) {if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {break;}
            // wait until new url added
            // 当 Scheduler 中不存在 Request 时,线程等待
            waitNewUrl();} else {threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 处理 Request,核心方法
                        processRequest(request);
                        // 调用监听器 onSuccess()方法
                        onSuccess(request);
                    } catch (Exception e) {// 调用监听器 onError()方法
                        onError(request);
                        logger.error("process request" + request + "error", e);
                    } finally {pageCount.incrementAndGet();
                        // 唤醒线程
                        signalNewUrl();}
                }
            });
        }
    }
    stat.set(STAT_STOPPED);
    // release some resources
    if (destroyWhenExit) {close();
    }
    logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}

2. initComponent()初始化组件:设置默认 Downloader 实现,初始化线程池

// 初始化组件
protected void initComponent() {if (downloader == null) {
        // 默认使用 HttpClientDownloader
        this.downloader = new HttpClientDownloader();}
    if (pipelines.isEmpty()) {pipelines.add(new ConsolePipeline());
    }
    downloader.setThread(threadNum);
    // 初始化线程池
    if (threadPool == null || threadPool.isShutdown()) {if (executorService != null && !executorService.isShutdown()) {threadPool = new CountableThreadPool(threadNum, executorService);
        } else {threadPool = new CountableThreadPool(threadNum);
        }
    }
    if (startRequests != null) {for (Request request : startRequests) {addRequest(request);
        }
        startRequests.clear();}
    startTime = new Date();}

3. waitNewUrl() / signalNewUrl():配合 Scheduler 对象实现生产者 / 消费者模式

// 线程等待
private void waitNewUrl() {newUrlLock.lock();
    try {
        //double check
        if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {return;}
        newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {logger.warn("waitNewUrl - interrupted, error {}", e);
    } finally {newUrlLock.unlock();
    }
}

// 线程唤醒
private void signalNewUrl() {
    try {newUrlLock.lock();
        newUrlCondition.signalAll();} finally {newUrlLock.unlock();
    }
}

在 Spider 类属性中包含了默认 Scheduler 实现类 QueueScheduler 的对象 scheduler,而在 QueueScheduler 类中默认使用内存阻塞队列来存储 Request。

// Spider 类属性
protected Scheduler scheduler = new QueueScheduler();
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
    // 默认使用内存阻塞队列来存储 Request 对象
    private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();

    @Override
    public void pushWhenNoDuplicate(Request request, Task task) {queue.add(request);
    }

    @Override
    public Request poll(Task task) {return queue.poll();
    }

    @Override
    public int getLeftRequestsCount(Task task) {return queue.size();
    }
    
    @Override
    public int getTotalRequestsCount(Task task) {return getDuplicateRemover().getTotalRequestsCount(task);
    }
}

看到这里,有人可能会疑惑当阻塞队列中没有 Request 对象时,线程会卡死在 Spider.run() 方法中 waitNewUrl() 上。

其实只要我们看 Spider.addUrl()Spider.addRequest()两个方法的源码,就会发现其中调用了线程唤醒方法 Spider.signalNewUrl(),而在爬虫入口类中必然会调addUrl()addRequest()其中一个来设置起始 url,所以不存在线程运行卡死的情况。

public Spider addUrl(String... urls) {for (String url : urls) {addRequest(new Request(url));
    }
    // 调用线程唤醒方法
    signalNewUrl();
    return this;
}

public Spider addRequest(Request... requests) {for (Request request : requests) {addRequest(request);
    }
    // 调用线程唤醒方法
    signalNewUrl();
    return this;
}

private void addRequest(Request request) {if (site.getDomain() == null && request != null && request.getUrl() != null) {site.setDomain(UrlUtils.getDomain(request.getUrl()));
    }
    // 将 request 推送到 scheduler 内存阻塞队列中去
    scheduler.push(request, this);
}

这样就构成了一个典型的生产者 / 消费者模式代码实现。

4. processRequest():爬虫业务逻辑的核心方法

首先调用 Downloader 下载网页,再调用自定义的 PageProcessor 解析网页文本并从中提取出目标数据,最后调用自定义的 Pipeline 持久化目标数据。

private void processRequest(Request request) {
    // 调用 Downloader 对象下载网页
    Page page = downloader.download(request, this);
    if (page.isDownloadSuccess()){
        // 下载成功
        onDownloadSuccess(request, page);
    } else {onDownloaderFail(request);
    }
}

private void onDownloadSuccess(Request request, Page page) {if (site.getAcceptStatCode().contains(page.getStatusCode())){
        // 调用自定义的 PageProcessor 对象(通过入口类设置)解析封装网页的 Page 对象
        pageProcessor.process(page);
        // 添加后续需要爬取的 url 字符串,推送 Request 到 Scheduler 中去
        extractAndAddRequests(page, spawnUrl);
        if (!page.getResultItems().isSkip()) {for (Pipeline pipeline : pipelines) {
                // 调用自定义的 Pipeline 对象(通过入口类设置)持久化目标数据,通过 ResultItems 对象封装传递
                pipeline.process(page.getResultItems(), this);
            }
        }
    } else {logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
    }
    sleep(site.getSleepTime());
    return;
}

private void onDownloaderFail(Request request) {if (site.getCycleRetryTimes() == 0) {sleep(site.getSleepTime());
    } else {
        // for cycle retry
        // 下载失败后重试
        doCycleRetry(request);
    }
}

protected void extractAndAddRequests(Page page, boolean spawnUrl) {
    // 添加 target Request
    if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {for (Request request : page.getTargetRequests()) {
            // 推送 Request 到 Scheduler 内存阻塞队列中去
            addRequest(request);
        }
    }
}

5. onSuccess() / onError():监听 Request 处理成功或失败的情况

只有需要自定义 SpiderListener 监听器时才会使用到,这里不是重点,不再赘述,具体使用方法可参考源码 https://github.com/xiawq87/sp… 中的实现。

小结

通过上述源码分析的过程,让我们可以大体了解 WebMagic 内部生产者 / 消费者模式的实现方式。

退出移动版