从零开始Golang爬虫(四)| 中间件
在从零开始Golang爬虫(三)| 并发调度中,我实现了一个简单的并发调度器,并将结果输出到stdout,考虑如下代码:
for {
result := <-e.ResultChan()
// 假装输出信息到stdout...
// 发送下一层的request
for _, request := range result.Requests {
go e.Submit(request)
}
}
当我们获取到result时输出信息,并将request重新提交给调度器,但如果以后要对result进行其他处理,比如:打印到日志文件、统计爬虫速度……,代码就会变成这样:
for {
result := <-e.ResultChan()
// 1.假装输出信息到stdout...
// 2.假装输出信息到日志文件...
// 3.假装输出爬虫耗时...
// 发送下一层的request
for _, request := range result.Requests {
go e.Submit(request)
}
}
将大量的非业务代码与业务代码糅合在一起,项目的可维护性将变得非常低,这时中间件就派上用场了
结构定义
package engine
type HandleResult func(result ParseResult)
type middleware func(HandleResult) HandleResult
因为我们要对result做多次处理,因此将处理函数同一定义为HandleResult,而中间件middleware就是一个将HandleResult包装之后再返回的函数
修改engine
package engine
import (
"moviecrawl/fetch"
)
// Engine 并发版引擎
type Engine struct {
Scheduler
HandleResult
WorkerCount int
middlewareChain []middleware
}
// NewEngine 返回默认的engine实例
func NewEngine() *Engine {
e := &Engine{
Scheduler: new(SimpleScheduler),
WorkerCount: 500,
}
e.HandleResult = func(result ParseResult) {
for _, request := range result.Requests {
go e.Submit(request)
}
}
return e
}
// Use 将中间件放入middlewareChain
func (e *Engine) Use(ms ...middleware) {
e.middlewareChain = append(e.middlewareChain, ms...)
}
// ApplyMiddlewareChain 应用所有中间件
func (e *Engine) ApplyMiddlewareChain() {
for i := len(e.middlewareChain) - 1; i >= 0; i-- {
e.HandleResult = e.middlewareChain[i](e.HandleResult)
}
}
// Run 运行并发爬虫
func (e *Engine) Run(seeds ...Request) {
e.ApplyMiddlewareChain()
e.Scheduler.Run()
// 按照数量创建worker
for i := 0; i < e.WorkerCount; i++ {
go e.CreateWorker()
}
// 提交所有request
for _, seed := range seeds {
go e.Submit(seed)
}
// 处理result
for {
result := <-e.ResultChan()
e.HandleResult(result)
}
}
这里需要注意的是为了保证执行顺序的正确性,中间件的Use顺序应该与Apply顺序是相反的,因为函数栈是后进先出的,类似于递归的执行过程,可以参考Go高级编程中对web中间件的解释:
增加中间件
package middleware
import (
"log"
"moviecrawl/engine"
)
// PrintStdout 第一次输出
func PrintStdout(next engine.HandleResult) engine.HandleResult {
var itemCount int
return func(result engine.ParseResult) {
if result.Requests == nil {
for _, item := range result.Items {
itemCount++
log.Printf("Got item %d: %v", itemCount, item)
}
} else {
for _, item := range result.Items {
log.Printf("Got item: %v", item)
}
}
next(result)
}
}
// // PrintStdout 第二次输出,用于测试
func PrintStdoutSecond(next engine.HandleResult) engine.HandleResult {
var itemCount int
return func(result engine.ParseResult) {
if result.Requests == nil {
for _, item := range result.Items {
itemCount++
log.Printf("[Second]Got item %d: %v", itemCount, item)
}
} else {
for _, item := range result.Items {
log.Printf("[Second]Got item: %v", item)
}
}
next(result)
}
}
- 为了便于测试,写两个Print中间件,其中next(result)表示调用内层的函数,逐层嵌套,最终调用默认的HandleResult
- 如果想要在执行HandleResult之后调用中间件,将程序写在next(result)之后即可
elapsed
package middleware
import (
"log"
"moviecrawl/engine"
"os"
"sync/atomic"
"time"
)
// Elapsed 统计耗时
func Elapsed(next engine.HandleResult) engine.HandleResult {
start := time.Now()
var count int64
times := []int64{10000}
return func(result engine.ParseResult) {
if result.Requests == nil {
for range result.Items {
atomic.AddInt64(&count, 1)
if atomic.LoadInt64(&count) == times[0] {
elapsed := time.Since(start).Seconds()
log.Printf("Elapsed %fs, crawl %d items.", elapsed, count)
times = times[1:]
if len(times) == 0 {
os.Exit(0)
}
}
}
}
next(result)
}
}
main
最后修改main函数,采用Use的方式应用中间件
func main() {
e := engine.NewEngine()
// 或者 e.Use(middleware.PrintStdout, middleware.PrintStdoutSecond)
e.Use(middleware.PrintStdout)
e.Use(middleware.PrintStdoutSecond)
task80s := NewTask("https://www.80s.tw/movie/list/-----p/", 1, 100)
requests := task80s.InitRequests()
e.Run(requests...)
}
运行结果
可以看到执行顺序无误
小结
本节基于Golang的函数式编程简单地实现了result处理中间件,将非业务代码从业务中剥离,极大提高了代码可读性与可维护性。