cmatrixprobe

发呆业务爱好者

从零开始Golang爬虫(三)| 并发调度

从零开始Golang爬虫(二)| 初步完成中,我初步实现了基于slice队列的电影网站爬虫,但是速度慢的感人,爬取一万条数据预计要3个多小时,本节中通过实现一个简单的调度器实现并发爬虫,大幅加快爬取速度。
调度模型


由于解耦带来的优势,之前的Fetch和Parse部分不用做任何修改,只需要修改engine

调度器

基于Scheduler接口实现一个简单的调度器

package engine

// Scheduler 调度器
type Scheduler interface {
    Submit(Request)               // 发送请求
    WorkChan() chan Request       // 返回工作channel
    ResultChan() chan ParseResult // 返回结果channel
    Run()                         // 初始化channel
}

// SimpleScheduler 简单调度器
type SimpleScheduler struct {
    workChan   chan Request
    resultChan chan ParseResult
}

func (s *SimpleScheduler) Submit(request Request) {
    s.workChan <- request
}

func (s *SimpleScheduler) WorkChan() chan Request {
    return s.workChan
}

func (s *SimpleScheduler) ResultChan() chan ParseResult {
    return s.resultChan
}

func (s *SimpleScheduler) Run() {
    s.workChan = make(chan Request)
    s.resultChan = make(chan ParseResult)
}

并发引擎

package engine

import (
    "log"
    "moviecrawl/fetch"
)

// Engine 并发版引擎
type Engine struct {
    Scheduler
    WorkerCount int
}

// Run 运行并发爬虫
func (e *Engine) Run(seeds ...Request) {
    e.Scheduler.Run()

    // 按照数量创建worker
    for i := 0; i < e.WorkerCount; i++ {
        go e.CreateWorker()
    }

    // 提交所有request
    for _, seed := range seeds {
        go e.Submit(seed)
    }

    var itemCount int
    for {
        result := <-e.ResultChan()
        for _, item := range result.Items {
            // 电影信息输出递增序号
            if result.Requests == nil {
                itemCount++
                log.Printf("Got item: %d, %v", itemCount, item)
            } else {
                log.Printf("Got item: %v", item)
            }
        }
        // 发送下一层的request
        for _, request := range result.Requests {
            go e.Submit(request)
        }
    }
}

// CreateWorker 创建工人
func (e *Engine) CreateWorker() {
    for {
        request := <-e.WorkChan()
        result, err := DoWork(request)
        if err != nil {
            log.Println(err)
            continue
        }
        if result.Items == nil {
            log.Println("No results returned.")
        }
        e.ResultChan() <- result
    }
}

// DoWork 处理请求并返回结果
func DoWork(request Request) (ParseResult, error) {
    body, err := fetch.Fetch(request.URL)
    if err != nil {
        return ParseResult{}, err
    }
    return request.ParseFunc(body), nil
}

初始化并运行engine

package main

import (
    "log"
    "moviecrawl/engine"
    _ "net/http/pprof"
)

func init() {
    log.SetFlags(log.Lshortfile)
    go http.ListenAndServe(":8888", nil)
}

func main() {
    e := engine.Engine{
        Scheduler:   new(engine.SimpleScheduler),
        WorkerCount: 500,
    }

    task80s := NewTask("https://www.80s.tw/movie/list/-----p/", 1, 100)
    requests := task80s.InitRequests()
    e.Run(requests...)
}

运行结果

并发爬虫耗时

创建500个worker并发爬取10000条数据只需40秒

小结

本节对engine进行重构并实现了最基础的并发调度器,爬取速度提升了数百倍。

从零开始Golang爬虫(二)| 初步完成

上一篇

从零开始Golang爬虫(四)| 中间件

下一篇
评论
头像 发表评论 说点什么
还没有评论
1362
0