从零开始Golang爬虫(五)| 数据存储
在上一小节中我利用函数式编程的思想实现了Print中间件,至此爬取的过程就算告一段落了,爬到数据后肯定要存储起来。如果按以前我可能会放到MySQL中,不过这节课我们学习了Elasticsearch——基于Lucene开发的分布式全文搜索引擎,最后还要提供搜索服务,那就来试试它吧……
func PrintStdout(next engine.HandleResult) engine.HandleResult {
var itemCount int
return func(result engine.ParseResult) {
// 输出信息
next(result)
}
}
再来回顾一下之前写的Print中间件,用到itemCount对电影计数,而考虑到如果计算爬取x条信息需要的时间、存储到Elasticsearch、或者爬取其他电影网站进行数据整合时(实际上我已经爬取了另外三个网站,不过写法都差不多),这些都需要用到itemCount,那么就不能简单地把它作为一个局部变量了。
我们可能还需要记录数据来源的URL……看来需要重新设计之前的解析结果数据结构了
结构定义
package engine
import "io"
// Request 构造请求
type Request struct {
URL string
ParseFunc func(io.ReadCloser, string) ParseResult
}
// ParseResult 解析结果
type ParseResult struct {
Items []Item
Requests []Request
}
// Item 数据项
type Item struct {
Id int64 // id
Type string // 类型
Refer string // 来源
Key string // 关键字
Data interface{} // 数据
}
先简单写一个并发安全的AutoIncrement,如果之后扩展成分布式爬虫可以用Redis重写
package util
import "sync/atomic"
var id int64
// AutoIncr 自增id
func AutoIncr() int64 {
return atomic.AddInt64(&id, 1)
}
相应修改
movie
func ParseMovie(body io.ReadCloser, refer string) engine.ParseResult {
// ...
var result engine.ParseResult
result.Items = append(result.Items, engine.Item{
Id: util.AutoIncr(),
Type: "movie",
Refer: refer,
Data: movie,
Key: strings.Join([]string{
movie.Name,
movie.Director,
}, ":"),
})
return result
}
movielist
func ParseMovieList(body io.ReadCloser, refer string) engine.ParseResult {
// ......
result.Items = append(result.Items, engine.Item{
Refer: refer,
Data: name,
})
// ...
}
engine
func DoWork(request Request) (ParseResult, error) {
body, err := fetch.Fetch(request.URL)
if err != nil {
return ParseResult{}, err
}
return request.ParseFunc(body, request.URL), nil
}
Pipeline
数据存储属于数据处理的一种,我们可以为每个数据处理方法分配一个pipeline(在go语言中用channel实现),上游将item向pipeline中分发,下游通过实现PipelineHandler接口进行相应的处理,也就是发布订阅模式。
package engine
type Pipeline chan Item
type PipelineHandler interface {
Handle(Pipeline)
}
// Engine 并发版引擎
type Engine struct {
// ...
pipelineMap map[Pipeline]PipelineHandler
}
// NewEngine 返回默认的engine实例
func NewEngine() *Engine {
e := &Engine{
Scheduler: new(SimpleScheduler),
WorkerCount: 500,
pipelineMap: make(map[Pipeline]PipelineHandler),
}
e.HandleResult = func(result ParseResult) {
if result.Requests == nil {
for _, item := range result.Items {
go e.Publish(item)
}
}
for _, request := range result.Requests {
go e.Submit(request)
}
}
return e
}
// Pip 为每个pipeline映射处理方法同时下游进行订阅处理
func (e *Engine) Pip(hs ...PipelineHandler) {
for _, h := range hs {
pip := make(Pipeline)
e.pipelineMap[pip] = h
go h.Handle(pip)
}
}
// Publish 将item发布给pipeline
func (e *Engine) Publish(item Item) {
for pip := range e.pipelineMap {
pip <- item
}
repo.SetURL(item.Refer)
}
环境部署
- 首先需要把es服务跑起来,我这里用的Docker,直接按照官方镜像仓库文档中的方式来启动,发现pull下来的latest版本居然是5.x的,而我准备用最新的7.4版本需要自己指定:
docker network create esnetwork
docker run -d --name elasticsearch --net esnetwork -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.4.2
- 顺便可以把kibana也跑起来,或者在Chrome应用商店安装ElasticSearch Head插件:
# 如果不用docker要求Node.js环境
docker run -d --name kibana --net esnetwork -p 5601:5601 kibana:7.4.2
- ik是一款中文分词器,其中ik_max_word会将文本做最细粒度的拆分,ik_smart相反,安装ik插件有两种方法:
# 作者推荐通过plugins install命令执行
# 不过我因为网络原因90%多卡住了
docker exec -it elasticsearch /bin/bash
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip
# 或者直接下载放入docker
下载https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip
mkdir ik && unzip elasticsearch-analysis-ik-7.4.2.zip -d ik
docker cp ik elasticsearch:/usr/share/elasticsearch/plugins
# 最后需要重启
ES操作定义
- 用到 github.com/olivere/elastic/v7 建立连接
package conn
import (
"context"
"github.com/olivere/elastic/v7"
"log"
)
var client *elastic.Client
func InitES() {
url := "http://127.0.0.1:9200"
var err error
client, err = elastic.NewClient(elastic.SetURL(url), elastic.SetSniff(false))
if err != nil {
log.Fatalf("Error creating the client with %s: %s", url, err)
}
info, code, err := client.Ping(url).Do(context.Background())
if err != nil {
log.Fatalf("Ping %s: %s", url, err)
}
log.Printf("Elasticsearch returned with code %d and version %s", code, info.Version.Number)
}
func GetESClient() *elastic.Client {
return client
}
- 定义数据操作
package es
import (
"context"
"log"
"moviecrawl/common/conn"
"moviecrawl/engine"
"strconv"
)
// CreateIndex 根据mapping创建index
func CreateIndex(index string, mapping string) {
client := conn.GetESClient()
exists, err := client.IndexExists(index).Do(context.Background())
if err != nil {
log.Fatalf("Check exists %s: %s", index, err)
}
if exists {
log.Printf("Index %s already exists", index)
return
}
createIndex, err := client.CreateIndex(index).Body(mapping).Do(context.Background())
if err != nil {
log.Fatalf("Create index %s: %s", index, err)
}
if !createIndex.Acknowledged {
log.Fatalf("Not acknowledged.")
}
}
// Put 插入一条记录
func Put(index string, item *engine.Item) {
client := conn.GetESClient()
_, err := client.Index().Index(index).Id(strconv.FormatInt(item.Id, 10)).
BodyJson(item.Data).Do(context.Background())
if err != nil {
log.Printf("Error insert data %s: %s", item.Data, err)
}
log.Printf("[ES] Insert %d: %s", item.Id, item.Key)
}
- 在config/movieMapping.json中写入mapping配置
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"Name": {
"type": "text",
"analyzer": "ik_max_word"
},
"Profile": {
"type": "text",
"analyzer": "ik_max_word"
},
"Alias": {
"type": "text",
"analyzer": "ik_max_word"
},
"Actor": {
"type": "text",
"analyzer": "ik_max_word"
},
"Type": {
"type": "keyword"
},
"Origin": {
"type": "keyword"
},
"Language": {
"type": "keyword"
},
"Director": {
"type": "text",
"analyzer": "ik_max_word"
},
"Date": {
"type": "date"
},
"Duration": {
"type": "short"
},
"Score": {
"type": "scaled_float",
"scaling_factor": 10
},
"Synopsis": {
"type": "text",
"analyzer": "ik_max_word"
},
"Download": {
"type": "keyword"
}
}
}
}
实现PipelineHandler
package pipeline
import (
"moviecrawl/common/conn"
"moviecrawl/common/es"
"moviecrawl/engine"
"moviecrawl/util"
)
// ElasticsearchHandler 存储到es
type ElasticsearchHandler struct {
index string
mapping string
}
// NewESHandler 根据参数返回一个实例
func NewESHandler(index, filepath string) *ElasticsearchHandler {
conn.InitES()
return &ElasticsearchHandler{
index: index,
mapping: util.ReadFileToString(filepath),
}
}
// Handle 实现PipelineHandler
func (h ElasticsearchHandler) Handle(pip engine.Pipeline) {
// 创建index
es.CreateIndex(h.index, h.mapping)
// 存储item
for item := range pip {
h.store(item)
}
}
func (h ElasticsearchHandler) store(item engine.Item) {
es.Put(h.index, &item)
}
package util
import (
"io/ioutil"
"os"
"path"
)
// ReadFileToString 从路径中读取文件为string
func ReadFileToString(filepath string) string {
dir, err := os.Getwd()
if err != nil {
panic(err)
}
f := path.Join(dir, filepath)
bytes, err := ioutil.ReadFile(f)
if err != nil {
panic(err)
}
return string(bytes)
}
收尾工作
- 将Print中间件修改一下
package middleware
import (
"log"
"moviecrawl/engine"
)
// PrintStdout 输出信息
func PrintStdout(next engine.HandleResult) engine.HandleResult {
return func(result engine.ParseResult) {
if result.Requests == nil {
for _, item := range result.Items {
log.Printf("Got item %d: %v", item.Id, item.Data)
}
} else {
for _, item := range result.Items {
log.Printf("Got item: %v", item.Data)
}
}
next(result)
}
}
- 最后在main函数中配置index和mapping
e.Pip(pipeline.NewESHandler("movies", "config/movieMapping.json"))
运行结果
小结
本节中通过发布订阅的思想将实现了Pipeline,数据存储到ElasticSearch并基于Kibana进行简单的数据展示,单机爬虫顺利完成,在此基础上还可以实现两个方向分别是分布式爬虫及搜索服务器前后端搭建。