cmatrixprobe

发呆业务爱好者

从零开始Golang爬虫(六)| 多任务

前几天我实现了基于Elasticsearch的数据存储,又提到了同时爬取多个网站,而多任务爬虫势必要解决一系列问题:

  • 断点续爬:爬到一半中断后继续爬取,需要保证不能重复爬取
  • 数据去重:从一个网站中爬取到重复的数据,只进行一次插入操作
  • 增量更新:多个网站中都有相同的电影,只更新部分字段,可以全量或增量更新

想要解决这些问题且保证爬虫的性能,我认为最好的办法是引入Redis,毕竟单机十万QPS

Redis

连接池

package conn

import (
    "github.com/gomodule/redigo/redis"
    "log"
)

var pool *redis.Pool

func init() {
    InitPool()
}

func InitPool() {
    const (
        db        = 0
        network   = "tcp"
        password  = ""
        MaxIdle   = 200
        MaxActive = 0
        address   = "127.0.0.1:6379"
    )

    pool = &redis.Pool{
        MaxIdle:   MaxIdle,
        MaxActive: MaxActive,
        Wait:      true,
        Dial: func() (c redis.Conn, err error) {
            c, err = redis.Dial(network, address, redis.DialPassword(password), redis.DialDatabase(db))
            if err != nil {
                log.Fatal(err)
            }
            return c, nil
        },
    }
}

func GetRedisPool() *redis.Pool {
    return pool
}

数据操作

分别定义:

  • 自增string:生成es的id,同时记录数量

  • hash:用于记录每个电影对应的id

  • set或bloom filter:记录item,避免重复爬取

package redisapi

import (
    "github.com/gomodule/redigo/redis"
    "log"
    "moviecrawl/common/conn"
)

// AutoIncrID 自增id
func AutoIncrID(key string) int64 {
    c := conn.GetRedisPool().Get()
    defer c.Close()

    res, err := redis.Int64(c.Do("INCR", key+"id"))
    if err != nil {
        log.Panicf("Auto increment %s: %s", key, err)
    }
    return res
}

// HashSet 插入数据到Hash
func HashSet(key, field, value string) {
    c := conn.GetRedisPool().Get()
    defer c.Close()

    _, err := c.Do("HSET", key, field, value)
    if err != nil {
        log.Panicf("HSET %s %s %s: %s", key, field, value, err)
    }
}

// HashGet 从Hash查询数据
func HashGet(key, field string) (string, bool) {
    c := conn.GetRedisPool().Get()
    defer c.Close()

    id, err := redis.String(c.Do("HGET", key, field))
    if err == redis.ErrNil {
        return "", false
    }
    if err != nil {
        log.Panicf("HGET %s %s: %s", key, field, err)
    }
    return id, true
}

// SetSadd 向set中增加数据
func SetSadd(key string, members ...string) {
    c := conn.GetRedisPool().Get()
    defer c.Close()

    cmd := []string{key + "set"}
    cmd = append(cmd, members...)

    obj := make([]interface{}, len(cmd))
    for i := range cmd {
        obj[i] = cmd[i]
    }

    _, err := c.Do("SADD", obj...)
    if err != nil {
        log.Panicf("SADD %s %s: %s", key, members, err)
    }
}

// SetSadd 向set中增加数据
func SetIsmember(key string, member string) bool {
    c := conn.GetRedisPool().Get()
    defer c.Close()

    exists, err := redis.Bool(c.Do("SISMEMBER", key+"set", member))
    if err != nil {
        log.Panicf("SISMEMBER %s %s: %s", key, member, err)
    }
    return exists
}

// BFMadd 向bloomfilter中增加数据
func BFMadd(key string, options ...string) {
    c := conn.GetRedisPool().Get()
    defer c.Close()

    cmd := []string{key + "bf"}
    cmd = append(cmd, options...)

    obj := make([]interface{}, len(cmd))
    for i := range cmd {
        obj[i] = cmd[i]
    }

    _, err := c.Do("BF.MADD", obj...)
    if err != nil {
        log.Panicf("BF.MADD %s %s: %s", key, options, err)
    }
}

// BFMadd 判断数据是否存在于bloomfilter
func BFExists(key string, option string) bool {
    c := conn.GetRedisPool().Get()
    defer c.Close()

    exists, err := redis.Bool(c.Do("BF.EXISTS", key+"bf", option))
    if err != nil {
        log.Panicf("BF.EXISTS %s %s: %s", key, option, err)
    }
    return exists
}

断点续爬

在上一步中已经定义了set或者布隆过滤器的读写操作,当数据量特别大的时候适合用布隆过滤器,空间和时间都是常数级,但存在一定误差(1%左右),即hash冲突导致后来者被判为已存在

如果用redis中布隆过滤器需要额外的docker镜像redislabs/rebloom

因为我这里爬取的电影数量级不过为万级,所以直接用set就可以

package repo

import (
    redisapi "moviecrawl/common/redis"
)

// URLExist 判断url是否存在
func URLExist(url string) bool {
    //if redisapi.BFExists("url", url) {
    //  return true
    //}
    if redisapi.SetIsmember("url", url) {
        return true
    }
    return false
}

// SetURL 记录url
func SetURL(url string) {
    //redisapi.BFMadd("url", url)
    redisapi.SetSadd("url", url)
}

在fetch之前进行判断

func DoWork(request Request) (ParseResult, error) {
    // 判断url是否已处理
    if repo.URLExist(request.URL) {
        return ParseResult{}, fmt.Errorf("URL has benn crawled: %s", request.URL)
    }
    body, err := fetch.Fetch(request.URL)
    if err != nil {
        return ParseResult{}, err
    }
    return request.ParseFunc(body, request.URL), nil
}

发布到pipeline后记录

func (e *Engine) Publish(item Item) {
    for pip := range e.pipelineMap {
        pip <- item
    }
    repo.SetURL(item.Refer)
}

去重与更新

更新可以分为增量更新、全量更新与不更新,可以通过map来记录更新策略。reflect包可以在运行时检测待更新的interface{}类型的Data,在本例中也就是model.Movie

将map与struct中的字段进行对比,并根据策略生成待更新的数据,分别进行增量与全量更新

因此重构整个ElasticsearchHandler

package pipeline

import (
    "encoding/json"
    "log"
    "moviecrawl/common/conn"
    "moviecrawl/common/es"
    redisapi "moviecrawl/common/redis"
    "moviecrawl/engine"
    "moviecrawl/util"
    "reflect"
    "strconv"
)

// 更新策略
const (
    updateIncr = iota
    updateFull
)

// ElasticsearchHandler 存储到es
type ElasticsearchHandler struct {
    index     string
    mapping   string
    updateMap map[string]int
}

// NewESHandler 根据index和mapping返回一个ElasticsearchHandler实例
func NewESHandler(index, filepath string) *ElasticsearchHandler {
    conn.InitES()
    return &ElasticsearchHandler{
        index:     index,
        mapping:   util.ReadFileToString(filepath),
        updateMap: make(map[string]int),
    }
}

// FullUpdate 全量更新的字段
func (h *ElasticsearchHandler) FullUpdate(fields ...string) {
    for _, field := range fields {
        h.updateMap[field] = updateFull
    }
}

// IncrUpdate 增量更新的字段 只有数组数组字段才能增量更新
func (h *ElasticsearchHandler) IncrUpdate(fields ...string) {
    for _, field := range fields {
        h.updateMap[field] = updateIncr
    }
}

// Handle 实现PipelineHandler接口
func (h ElasticsearchHandler) Handle(pip engine.Pipeline) {
    // 创建index
    es.CreateIndex(h.index, h.mapping)
    // 存储item
    for item := range pip {
        h.store(item)
    }
}

func (h ElasticsearchHandler) store(item engine.Item) {
    // redis中存在则更新
    if id, ok := redisapi.HashGet(item.Type, item.Key); ok {
        log.Printf("Item already exists in %s: %s", item.Type, item.Key)
        incr, full := h.updateFields(item)
        h.incrUpdate(id, incr)
        h.fullUpdate(id, full)
        return
    }
    // id自增并插入
    item.Id = redisapi.AutoIncrID(item.Type)
    es.Put(h.index, &item)
    // 插入记录到redis
    redisapi.HashSet(item.Type, item.Key, strconv.FormatInt(item.Id, 10))
}

// incrUpdate 增量更新
func (h ElasticsearchHandler) incrUpdate(id string, fields map[string][]interface{}) {
    for field, values := range fields {
        es.UpdateByScript(h.index, id, field, values...)
    }
}

// fullUpdate 全量更新
func (h ElasticsearchHandler) fullUpdate(id string, fields map[string]interface{}) {
    for field, values := range fields {
        es.UpdateByField(h.index, id, field, values)
    }
}

// updateFields 按配置提取出要更新的字段
func (h ElasticsearchHandler) updateFields(item engine.Item) (map[string][]interface{}, map[string]interface{}) {
    incr := make(map[string][]interface{})
    full := make(map[string]interface{})
    rType := reflect.TypeOf(item.Data)
    rValue := reflect.ValueOf(item.Data)

    // 遍历struct
    for i := 0; i < rType.NumField(); i++ {
        fieldName := rType.Field(i).Name
        if t, ok := h.updateMap[fieldName]; ok {
            value := rValue.Field(i).Interface()
            switch t {
            case updateIncr:
                var inter []interface{}
                bytes, _ := json.Marshal(value)
                if err := json.Unmarshal(bytes, &inter); err != nil {
                    log.Fatalf("Unmarshal %s: %s", bytes, err)
                }
                incr[fieldName] = inter
            case updateFull:
                full[fieldName] = value
            }
        }
    }
    return incr, full
}

Elasticsearch的增量更新可以通过脚本实现

// UpdateByField 更新某一字段
func UpdateByField(index string, id string, field string, value interface{}) {
    client := conn.GetESClient()
    _, err := client.Update().Index(index).Id(id).Doc(map[string]interface{}{field: value}).Do(context.Background())
    if err != nil {
        log.Fatalf("Update %s with %v: %s", field, value, err)
    }
}

// UpdateByScript 基于脚本增量更新
func UpdateByScript(index string, id string, field string, values ...interface{}) {
    client := conn.GetESClient()
    format := "if(ctx._source.%s==null)" +
        "{ctx._source.%[1]s=[params.value]}" +
        "else if(ctx._source.%[1]s.indexOf(params.value) < 0)" +
        "{ctx._source.%[1]s.add(params.value)}"
    code := fmt.Sprintf(format, field)
    for _, value := range values {
        script := elastic.
            NewScript(code).
            Param("value", value)
        _, err := client.Update().Index(index).Id(id).
            Script(script).
            Do(context.Background())
        if err != nil {
            panic(err)
        }
    }
}

更新策略

可以通过调用FullUpdate和IncrUpdate设置全量更新和增量更新的字段,不设置即不更新

在本例中只对下载链接进行增量更新

func main() {
    e := engine.NewEngine()

    esHandler := pipeline.NewESHandler("movies", "config/movieMapping.json")
    esHandler.IncrUpdate("Download")
    e.Pip(esHandler)

    e.Use(middleware.PrintStdout)

    task80s := NewTask("https://www.80s.tw/movie/list/-----p/", 1, 100)
    requests := task80s.InitRequests()
    e.Run(requests...)
}

小结

本节通过redis解决了一系列多任务爬虫的问题,在此基础上就可以通过NewTask实例化多个网站的爬虫任务了

从零开始Golang爬虫(五)| 数据存储

上一篇

go modules简单使用及踩坑

下一篇
评论
头像 发表评论 说点什么
还没有评论