通过《Colly源码解析——框架》分析,我们可以知道Colly执行的主要流程。本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现。(转载请指明出于breaksoftware的csdn博客)
递归深度
以下例子截取于Basic
c := colly.NewCollector(
// Visit only domains: hackerspaces.org, wiki.hackerspaces.org
colly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),
)
// On every a element which has href attribute call callback
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
// Print link
fmt.Printf("Link found: %q -> %s\n", e.Text, link)
// Visit link found on page
// Only those links are visited which are in AllowedDomains
c.Visit(e.Request.AbsoluteURL(link))
})
c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。
func (c *Collector) Visit(URL string) error {
return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}
由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测
// requestCheck method
if c.MaxDepth > 0 && c.MaxDepth < depth {
return ErrMaxDepth
}
如果希望通过MaxDepth控制深度,则可以参见Max depth例子
c := colly.NewCollector(
// MaxDepth is 1, so only the links on the scraped page
// is visited, and no further links are followed
colly.MaxDepth(1),
)
// On every a element which has href attribute call callback
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
link := e.Attr("href")
// Print link
fmt.Println(link)
// Visit link found on page
e.Request.Visit(link)
})
第4行将深度设置为1,这样理论上只能访问第一层的URL。
如果OnHTML中的代码和Basic例子一样,即使用Collector的Visit访问URL,则由于其depth一直传1,而导致requestCheck的深度检测一直不满足条件,从而会访问超过1层的URL。
所以第13行,调用的是HTMLElement的Visit方法
func (r *Request) Visit(URL string) error {
return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}
相较于Collector的Visit,HTMLElement的Visit方法将Depth增加了1,并且传递了请求的上下文(ctx)。由于depth有变化,所以之后的深度检测会返回错误,从而只会访问1层URL。
规则
Collector的Limit方法用于设置各种规则。这些规则最终在Collector的httpBackend成员中执行。
一个Collector只有一个httpBackend结构体指针,而一个httpBackend结构体可以有一组规则
type httpBackend struct {
LimitRules []*LimitRule
Client *http.Client
lock *sync.RWMutex
}
规则针对Domain来区分,我们可以通过设定不同的匹配规则,让每组URL执行相应的操作。这些操作包括:
- 访问并行数
- 访问间隔延迟
参见Parallel例子。只截取其中关键一段
// Limit the maximum parallelism to 2
// This is necessary if the goroutines are dynamically
// created to control the limit of simultaneous requests.
//
// Parallelism can be controlled also by spawning fixed
// number of go routines.
c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})
Collector的Limit最终会调用到httpBackend的Limit,它将规则加入到规则组后初始化该规则。
// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {
waitChanSize := 1
if r.Parallelism > 1 {
waitChanSize = r.Parallelism
}
r.waitChan = make(chan bool, waitChanSize)
hasPattern := false
if r.DomainRegexp != "" {
c, err := regexp.Compile(r.DomainRegexp)
if err != nil {
return err
}
r.compiledRegexp = c
hasPattern = true
}
if r.DomainGlob != "" {
c, err := glob.Compile(r.DomainGlob)
if err != nil {
return err
}
r.compiledGlob = c
hasPattern = true
}
if !hasPattern {
return ErrNoPattern
}
return nil
}
第7行创建了一个可以承载waitChanSize个元素的channel。可以看到,如果我们在规则中没有设置并行数,也会创建只有1个元素的channel。这个channel会被用于调节并行执行的任务数量。所以这也就意味着,一旦调用了Limit方法而没设置Parallelism值,该Collector中针对符合规则的请求就会变成串行的。
第10和18行分别针对不同规则初始化一个编译器。因为这个操作比较重,所以在初始化时执行,之后只是简单使用这些编译器即可。
当发起请求时,流程最终会走到httpBackend的Do方法
func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {
r := h.GetMatchingRule(request.URL.Host)
if r != nil {
r.waitChan <- true
defer func(r *LimitRule) {
randomDelay := time.Duration(0)
if r.RandomDelay != 0 {
randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))
}
time.Sleep(r.Delay + randomDelay)
<-r.waitChan
}(r)
}
第2行通过域名查找对应的规则,如果找到,则在第4行尝试往channel中加入元素。这个操作相当于上锁。如果channel此时是满的,则该流程会被挂起。否则就执行之后的流程。在Do函数结束,命中规则的会执行上面的匿名函数,它在休眠规则配置的时间后,尝试从channel中获取数据。这个操作相当于释放锁。
Colly就是通过channel的特性实现了并行控制。
并行
在“规则”一节,我们讲到可以通过Parallelism控制并行goroutine的数量。httpBackend的Do方法最终将被Collector的fetch方法调用,而该方法可以被异步执行,即是一个goroutine。这就意味着承载Do逻辑的goroutine执行完毕后就会退出。而一种类似线程的技术在Colly也被支持,它更像一个生产者消费者模型。消费者线程执行完一个任务后不会退出,而在生产者生产出的物料池中取出未处理的任务加以处理。
以下代码截取于Queue
q, _ := queue.New(
2, // Number of consumer threads
&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
)
……
for i := 0; i < 5; i++ {
// Add URLs to the queue
q.AddURL(fmt.Sprintf("%s?n=%d", url, i))
}
// Consume URLs
q.Run(c)
这次没有调用Collector的Visit等函数,而是调用了Queue的Run。
第2行创建了一个具有2个消费者(goroutine)的Queue。第10行预先给这个Queue加入5个需要访问的URL。
// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {
u, err := url.Parse(URL)
if err != nil {
return err
}
r := &colly.Request{
URL: u,
Method: "GET",
}
d, err := r.Marshal()
if err != nil {
return err
}
return q.storage.AddRequest(d)
}
AddUrl的第11行将请求序列化,在第15行将该序列化数据保存到“仓库”中。
在Run方法中,Colly将启动2个goroutine。注意它是使用for循环组织的,这意味着如果for内无break,它会一直循环执行下去——不退出。
func (q *Queue) Run(c *colly.Collector) error {
wg := &sync.WaitGroup{}
for i := 0; i < q.Threads; i++ {
wg.Add(1)
go func(c *colly.Collector, wg *sync.WaitGroup) {
defer wg.Done()
for {
如果队列中没有需要处理的request,则会尝试退出
if q.IsEmpty() {
if q.activeThreadCount == 0 {
break
}
ch := make(chan bool)
q.lock.Lock()
q.threadChans = append(q.threadChans, ch)
q.lock.Unlock()
action := <-ch
if action == stop && q.IsEmpty() {
break
}
}
activeThreadCount表示当前运行中的消费者goroutine数量。如果已经没有消费者了,则直接跳出for循环,整个goroutine结束。
如果还有消费者,则创建一个channel,并将其加入到q.threadChans的channel切片中。然后在第9行等待该channel被写入值。如果写入的是true并且此时没有需要处理的request,则退出goroutine。可以看到这段逻辑检测了两次是否有request,这个我们之后再讨论。
如果还有request要处理,则递增消费者数量(在finish中会递减以抵消)。然后从“仓库”中取出一个任务,在通过Request的Do方法发起请求,最后调用finish方法善后。
q.lock.Lock()
atomic.AddInt32(&q.activeThreadCount, 1)
q.lock.Unlock()
rb, err := q.storage.GetRequest()
if err != nil || rb == nil {
q.finish()
continue
}
r, err := c.UnmarshalRequest(rb)
if err != nil || r == nil {
q.finish()
continue
}
r.Do()
q.finish()
}
}(c, wg)
}
wg.Wait()
return nil
}
finish方法干了三件事:
- 递减消费者数量,以抵消Run方法中的递增。
- 将Queue的各个等待中的,其他goroutine创建的channel传入true值,即告知他们可以退出了。
- 给Queue创建一个空的channel切片
func (q *Queue) finish() {
q.lock.Lock()
q.activeThreadCount--
for _, c := range q.threadChans {
c <- stop
}
q.threadChans = make([]chan bool, 0, q.Threads)
q.lock.Unlock()
}
我们再看下怎么在请求的过程中给Queue增加任务
// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {
d, err := r.Marshal()
if err != nil {
return err
}
if err := q.storage.AddRequest(d); err != nil {
return err
}
q.lock.Lock()
for _, c := range q.threadChans {
c <- !stop
}
q.threadChans = make([]chan bool, 0, q.Threads)
q.lock.Unlock()
return nil
}
第3~9行,会将请求序列化后保存到“仓库”中。
第10~15行,会将其他goroutine创建的channel传入false,告知它们不要退出。然后再创建一个空的channel切片。
finish和AddRequest都使用锁锁住了所有的逻辑,而且它们都会把其他goroutine创建的channel传入值,然后将Queue的channel切片清空。这样就保证这些channel只可能收到一种状态。由于它自己创建的channel是在finish调用完之后才有机会创建出来,所以不会造成死锁。
再回来看goroutine退出的逻辑
if q.IsEmpty() {
if q.activeThreadCount == 0 {
break
}
ch := make(chan bool)
q.lock.Lock()
q.threadChans = append(q.threadChans, ch)
q.lock.Unlock()
action := <-ch
if action == stop && q.IsEmpty() {
break
}
}
如果finish方法中递减的activeThreadCount为0,这说明这是最后一个goroutine了,而且当前也没request,所以退出。当然此时存在一种可能:在1行执行结束后,其他非消费者goroutine调用AddRequest新增了若干request。而执行第2行时,goroutine将退出,从而导致存在request没有处理的可能。
如果还存在其他goroutine,则本goroutine将在第5行创建一个channel,并将这个channel加入到Queue的channel切片中。供其他goroutine调用finish往channel中传入true,或者AddRequest传入false,调控是否需要退出本过程。在第9行等待channel传出数据前,可能存在如下几种情况:
- 执行了finish
- 执行了AddRequest
- 执行了finish后执行了AddRequest
- 执行了AddRequest后执行了finish
如果是第1和4种,action将是false。第2和3种,action是true。但是这个情况下不能单纯的通过action决定是否退出。因为第9和10行执行需要时间,这段时间其他goroutine可能还会执行AddRequest新增任务,或者GetRequest删除任务。所以还要在第10行检测下IsEmpty。
这段是我阅读Colly中思考的最多的代码,因为有goroutine和channel,导致整个逻辑比较复杂。也感慨下,虽然goroutine很方便,但是真的能把它写对也是不容易的。
分布式
在Queue例子中,我们看到“仓库”这个概念。回顾下Queue的例子,“仓库”是InMemoryQueueStorage。顾名思义,它是一个内存型的仓库,所以不存在分布式基础。
// create a request queue with 2 consumer threads
q, _ := queue.New(
2, // Number of consumer threads
&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage
)
一个分布式的例子是Redis backend,截取一段
// create the redis storage
storage := &redisstorage.Storage{
Address: "127.0.0.1:6379",
Password: "",
DB: 0,
Prefix: "httpbin_test",
}
// add storage to the collector
err := c.SetStorage(storage)
if err != nil {
panic(err)
}
// delete previous data from storage
if err := storage.Clear(); err != nil {
log.Fatal(err)
}
// close redis client
defer storage.Client.Close()
// create a new request queue with redis storage backend
q, _ := queue.New(2, storage)
这儿创建了一个redis型的仓库。不仅Collector的Storage是它,Queue的Storage也是它。这样一个集群上的服务都往这个仓库里存入和取出数据,从而实现分布式架构。
redisstorage库引自github.com/gocolly/redisstorage。我们查看其源码,其实现了Collector的storage需要的接口
type Storage interface {
// Init initializes the storage
Init() error
// Visited receives and stores a request ID that is visited by the Collector
Visited(requestID uint64) error
// IsVisited returns true if the request was visited before IsVisited
// is called
IsVisited(requestID uint64) (bool, error)
// Cookies retrieves stored cookies for a given host
Cookies(u *url.URL) string
// SetCookies stores cookies for a given host
SetCookies(u *url.URL, cookies string)
}
以及Queue的storage需要的
// Storage is the interface of the queue's storage backend
type Storage interface {
// Init initializes the storage
Init() error
// AddRequest adds a serialized request to the queue
AddRequest([]byte) error
// GetRequest pops the next request from the queue
// or returns error if the queue is empty
GetRequest() ([]byte, error)
// QueueSize returns with the size of the queue
QueueSize() (int, error)
}
有疑问加站长微信联系(非本文作者)