cmatrixprobe

发呆业务爱好者

从零开始Golang爬虫(四)| 中间件

从零开始Golang爬虫(三)| 并发调度中,我实现了一个简单的并发调度器,并将结果输出到stdout,考虑如下代码:

    for {
        result := <-e.ResultChan()

        // 假装输出信息到stdout...

        // 发送下一层的request
        for _, request := range result.Requests {
            go e.Submit(request)
        }
    }

当我们获取到result时输出信息,并将request重新提交给调度器,但如果以后要对result进行其他处理,比如:打印到日志文件、统计爬虫速度……,代码就会变成这样:

    for {
        result := <-e.ResultChan()

        // 1.假装输出信息到stdout...
        // 2.假装输出信息到日志文件...
        // 3.假装输出爬虫耗时...

        // 发送下一层的request
        for _, request := range result.Requests {
            go e.Submit(request)
        }
    }

将大量的非业务代码与业务代码糅合在一起,项目的可维护性将变得非常低,这时中间件就派上用场了


结构定义

package engine

type HandleResult func(result ParseResult)

type middleware func(HandleResult) HandleResult

因为我们要对result做多次处理,因此将处理函数同一定义为HandleResult,而中间件middleware就是一个将HandleResult包装之后再返回的函数

修改engine

package engine

import (
    "moviecrawl/fetch"
)

// Engine 并发版引擎
type Engine struct {
    Scheduler
    HandleResult
    WorkerCount int
    middlewareChain []middleware
}

// NewEngine 返回默认的engine实例
func NewEngine() *Engine {
    e := &Engine{
        Scheduler:   new(SimpleScheduler),
        WorkerCount: 500,
    }
    e.HandleResult = func(result ParseResult) {
        for _, request := range result.Requests {
            go e.Submit(request)
        }
    }
    return e
}

// Use 将中间件放入middlewareChain
func (e *Engine) Use(ms ...middleware) {
    e.middlewareChain = append(e.middlewareChain, ms...)
}

// ApplyMiddlewareChain 应用所有中间件
func (e *Engine) ApplyMiddlewareChain() {
    for i := len(e.middlewareChain) - 1; i >= 0; i-- {
        e.HandleResult = e.middlewareChain[i](e.HandleResult)
    }
}

// Run 运行并发爬虫
func (e *Engine) Run(seeds ...Request) {
    e.ApplyMiddlewareChain()
    e.Scheduler.Run()

    // 按照数量创建worker
    for i := 0; i < e.WorkerCount; i++ {
        go e.CreateWorker()
    }

    // 提交所有request
    for _, seed := range seeds {
        go e.Submit(seed)
    }

    // 处理result
    for {
        result := <-e.ResultChan()
        e.HandleResult(result)
    }
}

这里需要注意的是为了保证执行顺序的正确性,中间件的Use顺序应该与Apply顺序是相反的,因为函数栈是后进先出的,类似于递归的执行过程,可以参考Go高级编程中对web中间件的解释:

中间件执行流程

增加中间件

print

package middleware

import (
    "log"
    "moviecrawl/engine"
)

// PrintStdout 第一次输出
func PrintStdout(next engine.HandleResult) engine.HandleResult {
    var itemCount int
    return func(result engine.ParseResult) {
        if result.Requests == nil {
            for _, item := range result.Items {
                itemCount++
                log.Printf("Got item %d: %v", itemCount, item)
            }
        } else {
            for _, item := range result.Items {
                log.Printf("Got item: %v", item)
            }
        }

        next(result)
    }
}

// // PrintStdout 第二次输出,用于测试
func PrintStdoutSecond(next engine.HandleResult) engine.HandleResult {
    var itemCount int
    return func(result engine.ParseResult) {
        if result.Requests == nil {
            for _, item := range result.Items {
                itemCount++
                log.Printf("[Second]Got item %d: %v", itemCount, item)
            }
        } else {
            for _, item := range result.Items {
                log.Printf("[Second]Got item: %v", item)
            }
        }

        next(result)
    }
}
  • 为了便于测试,写两个Print中间件,其中next(result)表示调用内层的函数,逐层嵌套,最终调用默认的HandleResult
  • 如果想要在执行HandleResult之后调用中间件,将程序写在next(result)之后即可

elapsed

package middleware

import (
    "log"
    "moviecrawl/engine"
    "os"
    "sync/atomic"
    "time"
)

// Elapsed 统计耗时
func Elapsed(next engine.HandleResult) engine.HandleResult {
    start := time.Now()
    var count int64
    times := []int64{10000}

    return func(result engine.ParseResult) {
        if result.Requests == nil {
            for range result.Items {
                atomic.AddInt64(&count, 1)
                if atomic.LoadInt64(&count) == times[0] {
                    elapsed := time.Since(start).Seconds()
                    log.Printf("Elapsed %fs, crawl %d items.", elapsed, count)
                    times = times[1:]
                    if len(times) == 0 {
                        os.Exit(0)
                    }
                }
            }
        }

        next(result)
    }
}

main

最后修改main函数,采用Use的方式应用中间件

func main() {
    e := engine.NewEngine()
    //  或者 e.Use(middleware.PrintStdout, middleware.PrintStdoutSecond)
    e.Use(middleware.PrintStdout)
    e.Use(middleware.PrintStdoutSecond)

    task80s := NewTask("https://www.80s.tw/movie/list/-----p/", 1, 100)
    requests := task80s.InitRequests()
    e.Run(requests...)
}

运行结果

运行结果

可以看到执行顺序无误

小结

本节基于Golang的函数式编程简单地实现了result处理中间件,将非业务代码从业务中剥离,极大提高了代码可读性与可维护性。

从零开始Golang爬虫(三)| 并发调度

上一篇

从零开始Golang爬虫(五)| 数据存储

下一篇
评论
头像 发表评论 说点什么
还没有评论
1445
0