mysql 百万数据更新_如何更新数据库百万级,千万级数据?
先上两个基础的解决方案,后续还会通过消息队列,celery, kafka, akka来升级解决方案#! python3# -*- coding: utf-8 -*-import timeimport pymysqlimport html2textfrom DBUtils.PooledDB import PooledDBfrom sentiment_analysis import Sentiment
先上两个基础的解决方案,后续还会通过消息队列,celery, kafka, akka来升级解决方案
#! python3
# -*- coding: utf-8 -*-
import time
import pymysql
import html2text
from DBUtils.PooledDB import PooledDB
from sentiment_analysis import Sentiment_Analysis
from dialogue.dumblog import dlog
logger = dlog(__file__, console='debug')
class DB(object):
pool = PooledDB(creator=pymysql, mincached=1, maxcached=20, host='',
port=33312, user='', passwd='', db='table',
charset='utf8', cursorclass=pymysql.cursors.DictCursor)
def connect(self):
db = self.pool.connection()
cursor = db.cursor()
return db, cursor
def row_count(self):
db, cursor = self.connect()
cursor.execute("SELECT COUNT(*) FROM cluster WHERE binary_sentiment is NULL")
row_count = cursor.fetchone()['COUNT(*)']
return row_count
def calculate_sentiment(self, news_dict):
sa = Sentiment_Analysis()
result_tuple = sa.sentiment_analysis(news_dict)[0]
binary_sentiment_degree = int(result_tuple[0])
probability_sentiment_degree = result_tuple[1]
return binary_sentiment_degree, probability_sentiment_degree
def loop_data(self, rows):
res = []
for row in rows:
print(row['object_id'])
title = html2text.html2text(row['title'])
print(title)
content = html2text.html2text(row['content'])
news_dict = {'title': title, 'content': content}
binary_sentiment_degree, probability_sentiment_degree = self.calculate_sentiment(news_dict)
res.append({'object_id': row['object_id'], 'title': title, 'content': content, 'binary_sentiment_degree': binary_sentiment_degree, 'probability_sentiment_degree': probability_sentiment_degree})
return res
def update_data(self, res):
attempts = 0
basic_sleep_time = 1.2
base_time = 10
while attempts < 100:
try:
db, cursor = self.connect()
for item in res:
# res长度为1000条
cursor.execute("""
UPDATE cluster
SET title=%s, content=%s, binary_sentiment=%s, probability_sentiment=%s, update_time=%s
WHERE object_id=%s
""", (item['title'], item['content'], item['binary_sentiment_degree'], item['probability_sentiment_degree'], time.strftime('%Y-%m-%d %H:%M:%S'), item['object_id']))
db.commit()
except Exception as err:
# 记录log
logger.error(err)
attempts += 1
total_time = base_time * (basic_sleep_time ** attempts)
logger.info(total_time)
time.sleep(total_time)
else:
break
finally:
cursor.close()
db.close()
def shift_data(self):
row_count = self.row_count()
logger.info("total rows need to be updated {}".format(row_count))
offset_number = 0
while offset_number <= row_count:
db, cursor = self.connect()
cursor.execute("SELECT * FROM cluster WHERE binary_sentiment is NULL order by object_id LIMIT 1000 OFFSET %s" % offset_number)
rows = cursor.fetchall()
res = self.loop_data(rows)
self.update_data(res)
logger.info("already update rows: {}".format(offset_number))
offset_number += 1000
def main():
db = DB()
db.shift_data()
if __name__ == '__main__':
main()
第一种解决方案通过limit, offset分页解决,同时针对数据库不停的断开的情况加入了重试和连接池机制(数据库不停断开我真的是第一次遇到,报错就是mysql server has gone away, 真的很诡异),同时在面对这么多数据的时候,普通写法cursor.execute, peewee orm全部都会失效,因为它们会将数据一次性载入内存中,会报内存不足的错误,分页才能解决内存不足的错误。但这种方法的问题是什么呢?limit offset越到后面越慢,还有就是我这里主要是在用机器学习算法更新文章的情感系数,而我这里只用了一个进程去跑,自然很慢的,用代码性能检测工具cprofile去检测更新1000条数据的性能和时间,会发现除了建立连接的时间,就是情感分析的代码消耗时间最长了,
import cProfile
import pstats
cProfile.run("db.shift_data()", "thing.txt")
p = pstats.Stats("thing.txt")
p.sort_stats("cumulative").print_stats(100)
所以自然引出多进程的版本
#! python3
# -*- coding: utf-8 -*-
import MySQLdb
from MySQLdb.cursors import DictCursor
import html2text
from multiprocessing import Process, cpu_count, Queue
from DBUtils.PooledDB import PooledDB
from sentiment_analysis import Sentiment_Analysis
POOL = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, host='',
port=33312, user='', passwd='', db='table',
charset='utf8mb4', use_unicode=True, cursorclass=DictCursor)
def connect():
db = POOL.connection()
cursor = db.cursor()
return db, cursor
def add_job(job):
db, cursor = connect()
total = 8746218
print("total rows need to be updated {}".format(total))
offset_number = 0
while offset_number <= total:
sql = """SELECT * FROM cluster order by object_id LIMIT %s OFFSET %s"""
cursor.execute(sql, (1000, offset_number))
rows = cursor.fetchall()
job.put(rows)
offset_number += 1000
#print("already update {} rows".format(offset_number))
def create_process(job, concurrency):
for _ in range(concurrency):
process = Process(target=worker, args=(job, ))
process.start()
def worker(job):
while True:
try:
task = job.get()
insert(task)
except Exception as err:
print(err)
def insert(task):
batch = []
for item in task:
title = html2text.html2text(item['title'])
content = html2text.html2text(item['content'])
news_dict = {'title': title, 'content': content}
sa = Sentiment_Analysis()
result_tuple = sa.sentiment_analysis(news_dict)[0]
binary_sentiment_degree = int(result_tuple[0])
probability_sentiment_degree = result_tuple[1]
batch.append((item['object_id'],
item['url'],
item['title'],
item['html'],
item['content'],
item['category_name'],
item['occur_time'],
item['category_link'],
item['desc'],
item['source'],
item['created_time'],
item['keyword'],
binary_sentiment_degree,
probability_sentiment_degree,
item['update_time']))
db, cursor = connect()
sql = """INSERT INTO news (object_id, url, title, html, content, category_name, occur_time, category_link, news.desc, source, created_time, keyword, binary_sentiment, probability_sentiment, update_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
try:
cursor.executemany(sql, batch)
db.commit()
print('1000 rows has inserted!')
except MySQLdb.IntegrityError:
pass
finally:
cursor.close()
db.close()
def start(concurrency=cpu_count()):
job = Queue()
create_process(job, concurrency)
add_job(job)
try:
job.close()
job.join_thread()
except Exception as err:
print(err)
if __name__ == '__main__':
start()
首先我要解释一下为什么这里切换到了MySQLdb, 是有原因的,因为代码在linux上跑的时候,用pymysql竟然报了编码错误,错误信息如下:
cursor.execute(sql, (1000, offset_number)), unicodedecodeerror: 'utf-8' codec can't decode byte 0xb1 in position 5221: invalid start byte
反正也很诡异,然后我只能切换到MySQLdb,https://stackoverflow.com/questions/25865270/how-to-install-python-mysqldb-module-using-pipstackoverflow.com
主要参照这篇文章来去安装,注意是安装mysqlclient,但有可能会报错,要先安装sudo apt-get install python-pip python-dev libmysqlclient-dev,然后这个问题就解决了,其实这里的多进程是采用了读写分离,一个生产者,多个消费者的写法(有多少个进程,就有多少个消费者),将数据全部转移到了另外一张表。
大概情况就是这样,但还有没有优化的空间呢?可以考虑消息队列,比方说每次读取100万数据,平均分配给10个队列,作为10个生产者队列,然后每1个生产者队列又切分成10份,也就是100个消费者队列,思路就是这样。至于为什么选择celery, kafka, akka, 一个是主流的中间件框架,还有就是python, java, scala用的比较多。
未完待续,下面的文章就是队列版本了。
同时python打日志可以用logbook, 任务调用除了crontab可以用Azkabanhttps://logbook.readthedocs.io/en/stable/logbook.readthedocs.io王彦鸿:Azkaban入门篇zhuanlan.zhihu.com
后续更新:在与别人交流过后,有一些地方需要更新,首先可以用limit offset, 但一定要用order by,然后每次取1000条中最大的,不断更新最大的数值,还有就是mysql一张表如果优化的好的话,1亿的数据存在一张表里面是完全没有问题的。
更多推荐
所有评论(0)