先上两个基础的解决方案,后续还会通过消息队列,celery, kafka, akka来升级解决方案

#! python3

# -*- coding: utf-8 -*-

import time

import pymysql

import html2text

from DBUtils.PooledDB import PooledDB

from sentiment_analysis import Sentiment_Analysis

from dialogue.dumblog import dlog

logger = dlog(__file__, console='debug')

class DB(object):

pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host='',

port=33312, user='', passwd='', db='table',

charset='utf8', cursorclass=pymysql.cursors.DictCursor)

def connect(self):

db = self.pool.connection()

cursor = db.cursor()

return db, cursor

def row_count(self):

db, cursor = self.connect()

cursor.execute("SELECT COUNT(*) FROM cluster WHERE binary_sentiment is NULL")

row_count = cursor.fetchone()['COUNT(*)']

return row_count

def calculate_sentiment(self, news_dict):

sa = Sentiment_Analysis()

result_tuple = sa.sentiment_analysis(news_dict)[0]

binary_sentiment_degree = int(result_tuple[0])

probability_sentiment_degree = result_tuple[1]

return binary_sentiment_degree, probability_sentiment_degree

def loop_data(self, rows):

res = []

for row in rows:

print(row['object_id'])

title = html2text.html2text(row['title'])

print(title)

content = html2text.html2text(row['content'])

news_dict = {'title': title, 'content': content}

binary_sentiment_degree, probability_sentiment_degree = self.calculate_sentiment(news_dict)

res.append({'object_id': row['object_id'], 'title': title, 'content': content, 'binary_sentiment_degree': binary_sentiment_degree, 'probability_sentiment_degree': probability_sentiment_degree})

return res

def update_data(self, res):

attempts = 0

basic_sleep_time = 1.2

base_time = 10

while attempts < 100:

try:

db, cursor = self.connect()

for item in res:

# res长度为1000条

cursor.execute("""

UPDATE cluster

SET title=%s, content=%s, binary_sentiment=%s, probability_sentiment=%s, update_time=%s

WHERE object_id=%s

""", (item['title'], item['content'], item['binary_sentiment_degree'], item['probability_sentiment_degree'], time.strftime('%Y-%m-%d %H:%M:%S'), item['object_id']))

db.commit()

except Exception as err:

# 记录log

logger.error(err)

attempts += 1

total_time = base_time * (basic_sleep_time ** attempts)

logger.info(total_time)

time.sleep(total_time)

else:

break

finally:

cursor.close()

db.close()

def shift_data(self):

row_count = self.row_count()

logger.info("total rows need to be updated {}".format(row_count))

offset_number = 0

while offset_number <= row_count:

db, cursor = self.connect()

cursor.execute("SELECT * FROM cluster WHERE binary_sentiment is NULL order by object_id LIMIT 1000 OFFSET %s" % offset_number)

rows = cursor.fetchall()

res = self.loop_data(rows)

self.update_data(res)

logger.info("already update rows: {}".format(offset_number))

offset_number += 1000

def main():

db = DB()

db.shift_data()

if __name__ == '__main__':

main()

第一种解决方案通过limit, offset分页解决,同时针对数据库不停的断开的情况加入了重试和连接池机制(数据库不停断开我真的是第一次遇到,报错就是mysql server has gone away, 真的很诡异),同时在面对这么多数据的时候,普通写法cursor.execute, peewee orm全部都会失效,因为它们会将数据一次性载入内存中,会报内存不足的错误,分页才能解决内存不足的错误。但这种方法的问题是什么呢?limit offset越到后面越慢,还有就是我这里主要是在用机器学习算法更新文章的情感系数,而我这里只用了一个进程去跑,自然很慢的,用代码性能检测工具cprofile去检测更新1000条数据的性能和时间,会发现除了建立连接的时间,就是情感分析的代码消耗时间最长了,

import cProfile

import pstats

cProfile.run("db.shift_data()", "thing.txt")

p = pstats.Stats("thing.txt")

p.sort_stats("cumulative").print_stats(100)

所以自然引出多进程的版本

#! python3

# -*- coding: utf-8 -*-

import MySQLdb

from MySQLdb.cursors import DictCursor

import html2text

from multiprocessing import Process, cpu_count, Queue

from DBUtils.PooledDB import PooledDB

from sentiment_analysis import Sentiment_Analysis

POOL = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, host='',

port=33312, user='', passwd='', db='table',

charset='utf8mb4', use_unicode=True, cursorclass=DictCursor)

def connect():

db = POOL.connection()

cursor = db.cursor()

return db, cursor

def add_job(job):

db, cursor = connect()

total = 8746218

print("total rows need to be updated {}".format(total))

offset_number = 0

while offset_number <= total:

sql = """SELECT * FROM cluster order by object_id LIMIT %s OFFSET %s"""

cursor.execute(sql, (1000, offset_number))

rows = cursor.fetchall()

job.put(rows)

offset_number += 1000

#print("already update {} rows".format(offset_number))

def create_process(job, concurrency):

for _ in range(concurrency):

process = Process(target=worker, args=(job, ))

process.start()

def worker(job):

while True:

try:

task = job.get()

insert(task)

except Exception as err:

print(err)

def insert(task):

batch = []

for item in task:

title = html2text.html2text(item['title'])

content = html2text.html2text(item['content'])

news_dict = {'title': title, 'content': content}

sa = Sentiment_Analysis()

result_tuple = sa.sentiment_analysis(news_dict)[0]

binary_sentiment_degree = int(result_tuple[0])

probability_sentiment_degree = result_tuple[1]

batch.append((item['object_id'],

item['url'],

item['title'],

item['html'],

item['content'],

item['category_name'],

item['occur_time'],

item['category_link'],

item['desc'],

item['source'],

item['created_time'],

item['keyword'],

binary_sentiment_degree,

probability_sentiment_degree,

item['update_time']))

db, cursor = connect()

sql = """INSERT INTO news (object_id, url, title, html, content, category_name, occur_time, category_link, news.desc, source, created_time, keyword, binary_sentiment, probability_sentiment, update_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""

try:

cursor.executemany(sql, batch)

db.commit()

print('1000 rows has inserted!')

except MySQLdb.IntegrityError:

pass

finally:

cursor.close()

db.close()

def start(concurrency=cpu_count()):

job = Queue()

create_process(job, concurrency)

add_job(job)

try:

job.close()

job.join_thread()

except Exception as err:

print(err)

if __name__ == '__main__':

start()

首先我要解释一下为什么这里切换到了MySQLdb, 是有原因的,因为代码在linux上跑的时候,用pymysql竟然报了编码错误,错误信息如下:

cursor.execute(sql, (1000, offset_number)), unicodedecodeerror: 'utf-8' codec can't decode byte 0xb1 in position 5221: invalid start byte

反正也很诡异,然后我只能切换到MySQLdb,https://stackoverflow.com/questions/25865270/how-to-install-python-mysqldb-module-using-pip​stackoverflow.com

主要参照这篇文章来去安装,注意是安装mysqlclient,但有可能会报错,要先安装sudo apt-get install python-pip python-dev libmysqlclient-dev,然后这个问题就解决了,其实这里的多进程是采用了读写分离,一个生产者,多个消费者的写法(有多少个进程,就有多少个消费者),将数据全部转移到了另外一张表。

大概情况就是这样,但还有没有优化的空间呢?可以考虑消息队列,比方说每次读取100万数据,平均分配给10个队列,作为10个生产者队列,然后每1个生产者队列又切分成10份,也就是100个消费者队列,思路就是这样。至于为什么选择celery, kafka, akka, 一个是主流的中间件框架,还有就是python, java, scala用的比较多。

未完待续,下面的文章就是队列版本了。

同时python打日志可以用logbook, 任务调用除了crontab可以用Azkabanhttps://logbook.readthedocs.io/en/stable/​logbook.readthedocs.io王彦鸿:Azkaban入门篇​zhuanlan.zhihu.com10c6258d4c98ee4d2fa5df364cbbf41f.png

后续更新:在与别人交流过后,有一些地方需要更新,首先可以用limit offset, 但一定要用order by,然后每次取1000条中最大的,不断更新最大的数值,还有就是mysql一张表如果优化的好的话,1亿的数据存在一张表里面是完全没有问题的。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐