背景说明

一个scrapy project 可以有多个spider,但是它们之前的settings要分开怎么办?redis等配置要各自配置的情况怎么处理?
如果一个任务一个scrapy,感觉对复用方面很不友好

大体的思路:
一般这种抽象,我会采用策略模式来做,同时兼顾单例,根据flag的不同返回不同的策略对象,此对象拥有自己的独有逻辑

问题与解决方案

总体思路是这样
这里写图片描述

策略模式的思路

策略模式自行百度下,有很多例子
结合这边就是 写个spider相关类,
ParentSpiderProc 父类,用于写一些公共方法
AspiderProc, BspiderProc,具体的类,子类然后里面不同的方法,如 getLog,getKfkClient,getRedis等
SpiderContext,对外的方法类,这个对象就是外围实际使用的类,

spiderContext = SpiderContext.getSpiderContext(spider.name)
spiderContext.getLog()....

spiderContext.getKfkClient()....

settings.py,各spider会共用,这种设置怎么区分开来

这个可以用spider自己的custom_settings处理,在一个地方放 settings_XXSpider.py,在parse的时候加载进来,如

class StItemPage(scrapy.Spider):
    custom_settings = SpiderContext.getSpiderContext(name).getCustomSettings()

redis和kafka这种怎么按spider的不同设定不同的值

1,在 settings_ASpider.py 中

KAFKA_BOOTSTRAP_SERVERS="xxxxx:9092,xxxx:9094"
KAFKA_TOPIC_ID="aaaa_toic"

2,AspiderProc定义一个方法

def getKafka(self):
    if not hasattr(self, "akfk"):
    self.akfk = KfkClient(bootstrap_servers=settings_Aspider.KAFKA_BOOTSTRAP_SERVERS)
    return self.akfk

3,具体使用

spiderContext = SpiderContext.getSpiderContext(spider.name)
topic_id = spider.custom_settings['KAFKA_TOPIC_ID']
result = spiderContext.getKafka().sendMsg(topic_id, seqstr.encode("utf8"), targetjson.encode('utf8'))

HTTP代理池怎么处理?

代理池和上面的kfk没什么不同,不过代理池不是直接调用,自有逻辑比较多。自己有一个类GProxy
代理池用redis,怎么把redis传进来?
GProxy初始化时,加上redis对象
def init(self,credisObj):

然后在Proc中获取Gproxy时,要先new一个redis对象

# 父类直接实现
def getGProxy(self):
    # proxy使用comm的redis配置,即settings.py的
    if not hasattr(self, "gProxy"):
        self.gProxy = GProxy(self.getCommRedis())
    return self.gProxy


def getCommRedis(self):
    if not hasattr(self, "r0"):
        self.r0 = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB_INDEX,password=settings.REDIS_DB_PWD,decode_responses=True)
    return self.r0

这里说明一下,由于proxy我定性为公共的内容,所以这里是getCommRedis,其链接也是在settings拿的

LOG怎么搞

log除了普通的log,还有一些是 专用log,流程上的一些具体打印的日志,如请求一个url,一个log,返回一个log, 全部处理完了一个log,这些都是不同的日志

首先我们有一个logUtil

    def createLogger(loggerName,loggerPath,displayinTerminal=False):  # 日志
        fmt = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        datefmt = "%a %d %b %Y %H:%M:%S"
        formatter = logging.Formatter(fmt, datefmt)

        # 数据日志输出
        logger = logging.getLogger(loggerName)
        if logger.handlers and len(logger.handlers)>0:
            return  logger
        logger.setLevel(logging.DEBUG)
        # create file handler
        fh = logging.handlers.RotatingFileHandler(filename=loggerPath,encoding="utf8")
        fh.setLevel(logging.INFO)
        # create formatter
        fh.setFormatter(formatter)

        if displayinTerminal:
            ch = logging.StreamHandler()
            ch.setLevel(logging.INFO)
            ch.setFormatter(formatter)
            logger.addHandler(ch)

        arr = logger.handlers
        for chan in arr:
            print (chan)
        logger.addHandler(fh)
        return logger

其次,我们在 XspiderProc中拿不同的log, 使用(实现代码看下一章,):

# 普通:
spiderContext = SpiderContext.getSpiderContext(spider.name)
spiderContext.getLog().errorLog().error("请求抛错了<<<<,错误信息为{},代理地址为:{},目标地址为{}".format(exception,proxy,targeturl))

# 具体日志:
spiderContext.getLog().grab_response().info("请求返回<<<<url为:{},返回的状态:{}".format(request.url,response.status))
spiderContext.getLog().grab_request().info(">>>正在请求>>>>{},其meta为:{},其header为{}".format( request.url,request.meta,request.headers))

具体代码

ParentSpiderProc类:

# -*- coding: utf-8 -*-

from dspider import settings
import redis
from dspider.tools.GetProxy import GetProxy
from dspider.utils.LoggerUtil import LogUtils


# 请不要直接new
class ParentSpiderProc(object):

    def getSpiderName(self):
        return self.spiderName

    def setSpiderName(self, a_spiderName):
        self.spiderName = a_spiderName

    # 子类需要实现
    def getBusiRedis(self):
        raise NotImplementedError()

    # 获取setting
    def getCustomSettings(self):
        raise NotImplementedError()


    def getKafka(self):
        raise NotImplementedError()

    def getLog(self):
        raise NotImplementedError()


    # 父类直接实现
    def getGProxy(self):
        # proxy使用comm的redis配置,即settings.py的
        if not hasattr(self, "gProxy"):
            self.gProxy = GetProxy("http", self.getCommRedis())
        return self.gProxy


    def getCommRedis(self):
        if not hasattr(self, "r0"):
            self.r0 = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB_INDEX,
                                  password=settings.REDIS_DB_PWD,
                                  decode_responses=True)
        return self.r0


class BaseBusiLog(object):
    # 根据 spiderName会有不同的目录和log前缀

    def __init__(self,spiderName):
        if not hasattr(self,"lname"):
            self.lname = spiderName
        super().__init__()

    def grab_request(self):
        return BaseBusiLog.__newLog__("grab_request",self.lname)

    def grab_response(self):
        return BaseBusiLog.__newLog__("grab_response",self.lname)

    def errorLog(self):
        return BaseBusiLog.__newLog__("errorLog",self.lname)

    def infoLog(self):
        return BaseBusiLog.__newLog__("infoLog",self.lname)

    def pipelinesLog(self):
        # pipeline到json文件(测试时)
        return BaseBusiLog.__newLog__("pipelinesLog", self.lname)

    def getLogPath(self):
        log_dataInfo_path = settings.LOG_DIR + "/" + self.lname + "/"
        return  log_dataInfo_path



    @staticmethod
    def __newLog__(logname, lname):
        unionLogName = lname+"_"+logname
        log_dataInfo_path = settings.LOG_DIR + "/" + lname +  "/flow_" + unionLogName + ".log"
        logObj = LogUtils.createLogger(logname, log_dataInfo_path, False)
        return logObj

ASpiderProc:

# -*- coding: utf-8 -*-

from dspider import settings_ASpider
from dspider.spider_strategy.ParentSpiderProc import *
from dspider.tools.KfkClient import KfkClient
from dspider.utils.PythonToMap import PythonToMap

# settings_ASpider只允许在这里出现
class ASpiderProc(ParentSpiderProc):

    def getCustomSettings(self):
        return PythonToMap.genMap(settings_ASpider)

    # def getGProxy(self):
    #     return super().getGProxy()

    def getKafka(self):
        if not hasattr(self, "akfk"):
            self.akfk = KfkClient(bootstrap_servers=settings_ASpider.KAFKA_BOOTSTRAP_SERVERS)
        return self.akfk

    def getLog(self):
        if not hasattr(self, "alog"):
            self.alog = DetailLog(self.spiderName)
        return self.alog

    def getBusiRedis(self):
        if not hasattr(self, "r1"):
            self.r1 = redis.Redis(host=settings_ASpider.BUSI_REDIS_HOST,
                                  port=settings_ASpider.BUSI_REDIS_PORT,
                                  db=settings_ASpider.BUSI_REDIS_DB_INDEX,
                                  password=settings_ASpider.BUSI_REDIS_DB_PWD,
                                  decode_responses=True)
        return self.r1


class DetailLog(BaseBusiLog):

    def succ_response_but_ignore(self):
        return DetailLog.__newLog__("succ_response_but_ignore", self.lname)

    def succ_response_and_process(self):
        return DetailLog.__newLog__("succ_response_and_process", self.lname)

SpiderContext:

# -*- coding: utf-8 -*-

from dspider.spider_strategy.ASpiderProc import *
from dspider.spider_strategy.PageSpiderProc import *

spiderContextMap = {}

class SpiderContext(object):
    # 请不要直接使用,请用SpiderContext.getSpiderContext
    def __init__(self,spiderName):
        if spiderName=='ASpider':
            self.spiderProc = ASpiderProc()
        elif spiderName == 'BSpider':
            self.spiderProc = BSpiderProc()
        else:
            raise Exception("不知道的spiderName")
        self.spiderProc.setSpiderName(spiderName)

    @staticmethod
    def getSpiderContext(spiderName):
        targetContext = spiderContextMap.get(spiderName)
        if not targetContext:
            targetContext = SpiderContext(spiderName)  #不同的name不同的context
            spiderContextMap[spiderName] = targetContext
        return  targetContext

    def getSpiderName(self):
        return self.spiderProc.getSpiderName()


    def getCustomSettings(self):
        return self.spiderProc.getCustomSettings()


    def getKafka(self):
        return self.spiderProc.getKafka()

    def getLog(self):
        return self.spiderProc.getLog()

    def getGProxy(self):
        return self.spiderProc.getGProxy()

    def getCommRedis(self):
        return self.spiderProc.getCommRedis()

    def getBusiRedis(self):
        return self.spiderProc.getBusiRedis()

上面提到的工具类 PythonToMap:

# -*- coding: utf-8 -*-

import inspect
from dspider import settings
class PythonToMap(object):

    @staticmethod
    def genMap(targetPy):
        targetObj = {}
        for key, obj in inspect.getmembers(targetPy):
            # print("{}:{}".format(key, obj))
            if inspect.isclass(obj):
                continue
            if inspect.ismodule(obj):
                continue
            if '__' in key:
                continue
            if inspect.ismethod(key):
                continue
            targetObj[key] = obj
        # print(targetObj.keys())
        return  targetObj

if __name__ == '__main__':

    print (PythonToMap.genMap(settings))

一些说明

  • 记得要继承 ASpiderProc(ParentSpiderProc)
  • log我是单独一个类来处理了,同时有一个BaseXXLog用于写共有的日志
  • log的路径 LOG_DIR是在settings中配置,然后不同的路径:LOG_DIR/{spidername}/{spidername}_flow_XXX.log
  • .py文件变成 Map类型,需要 用 PythonToMap
  • redis有一个小矛盾的地方,不确认是公共的还是私有的,所以我两个都提供了.
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐