日常学习-20240710
批量插入数据,redis实现session共享,redis分布式锁,MQ处理消息,kafka速度,elasticsearch
1、一次一千万条数据插入和删除案例:
第一次:插入--批量插入,每次插入5000条数据,总耗时28min,数据无异常
删除--通过sql语句一次性删除,总耗时1h52min;一次删除的数据过多导致mysql的备份恢复文件极其庞大,相应的日志文件也在不停的膨胀
第二次:鉴于第一次插入和删除都非常耗时,对mysql参数和表做了如下修改;
- 查看当前设置:
SHOW VARIABLES LIKE 'max_allowed_packet';
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_log_file_size';
SHOW VARIABLES LIKE 'innodb_log_buffer_size';
其中max_allowed_packet:
默认值通常为4MB或16MB。(实际为1M)
如果需要处理大型数据包(如包含大量数据的INSERT语句),可能需要将其设置为更大的值,如64MB或更大。但请注意,增加此值也会增加内存使用。(一次批处理数据超过1万条就放不下)
innodb_buffer_pool_size:(能等待多次批处理数据,再和数据库交互,实际设置为1024M)
建议将其设置为系统内存的50%-80%。
如果服务器上只运行MySQL,可以设置为系统内存的更高比例(如80%)。
如果还运行其他应用程序,可能需要将其设置为较低的比例(如70%)。
innodb_log_file_size:(默认为24M,无法修改,修改后mysql服务无法启动)
建议将其设置为系统总内存的10%-25%。
较大的日志文件可以减少写入频率,但也会增加恢复时间。
innodb_log_buffer_size:(默认为1M,修改为4M)
默认值通常足够用于大多数应用程序。
如果执行大量小事务,并且发现重做日志写入成为性能瓶颈,可以考虑增加此值。
- 找到MySQL的配置文件(通常是my.cnf或my.ini),并编辑它以包含新的参数值。
编辑配置文件:
打开my.cnf或my.ini文件。
在[mysqld]部分下添加或修改以下行:
[mysqld]
max_allowed_packet = 64M
innodb_buffer_pool_size = 1024M
innodb_log_file_size = 24M
innodb_log_buffer_size = 4M - 重启验证和注意事项
重启MySQL服务
验证更改
修改innodb_log_file_size时,需要特别注意,因为只有在MySQL服务完全停止后才能更改此值。如果MySQL正在运行,则无法直接修改此值。
在修改任何配置之前,请务必备份原始配置文件,以防止意外情况发生。 - 将对应表的非主键索引和外键都暂时禁用(插入时需要用到mysql的自增功能,主键和主键的索引不能删除(在批量删除时,如果去除主键和主键索引,反而会变得更慢);禁用方式为临时删除,后期再加上)
- 插入时分批添加,一次批处理数据量为5万条,耗时6-7min;删除时也是分批删除,一次批处理数据量为5万条,耗时2-3min
第三次:考虑在插入数据时使用线程池多线程进行批量插入,总耗时也是在6-7min,数据无异常,本次多线程对插入时间提升不明显。以下是第二次和第三次操作的具体实现:
第二次插入
if (users.size() >= 50000) {
userDao.insertBatch(users);
// users集合清空
users.clear();
}
第三次插入:多线程中使用的集合为线程安全的集合,每个线程都只会批量插入自己线程中的副本数据;每次批量插入完成(或者发生异常时)需要将存在本地变量中的集合清空,并且将本地变量清空(防止内存溢出);
ThreadLocal<List<User>> threadLocalObjects = new ThreadLocal<>();
if (users.size() >= 50000) {
CopyOnWriteArrayList<User> usersCopy = new CopyOnWriteArrayList<>(users);
threadPoolExecutor.execute(() -> {
try {
threadLocalObjects.set(usersCopy);
// 现在,对threadLocalObjects.get()的操作都是在副本上进行,线程安全
userDao.insertBatch(threadLocalObjects.get());
} catch (Exception e) {
// 异常处理逻辑,可以根据实际情况调整
e.printStackTrace();
} finally {
// 确保在所有情况下都清除线程局部变量
threadLocalObjects.get().clear();
threadLocalObjects.remove();
}
});
users.clear();
}
}
// 等待线程执行完毕,但是不再接受新的任务
threadPoolExecutor.shutdown();
if(!threadPoolExecutor.awaitTermination(20, TimeUnit.MINUTES)){
System.out.println("线程执行超时");
}
System.out.println("数据填充完毕");
分批删除
// 1千万条数据删除,不推荐多线程删除
// 多次批量删除,临时禁用索引
ArrayList<Integer> ids = new ArrayList<>();
Integer count = userDao.selectCount(new QueryWrapper<>(new User()));
for (int i = 40; i < count + 9; i++) {
ids.add(i);
if (ids.size() >= 50000) {
userDao.deleteBatchIds(ids);
// 清空ids集合
ids.clear();
}
if (i == count - 1 + 9 && ids.size() > 0) {
userDao.deleteBatchIds(ids);
// 清空ids集合
ids.clear();
}
}
System.out.println("数据删除完毕");
2、redis实现session共享
第一是创建令牌的程序,就是在用户初次访问服务器时,给它创建一个唯一的身份标识,并且使用cookie封装这个标识再发送给客户端。那么当客户端下次再访问服务器时,就会自动携带这个身份标识了,这和SESSIONID的道理是一样的,只是改由我们自己来实现了。另外,在返回令牌之前,我们需要将它存储起来,以便于后续的验证。而这个令牌是不能保存在服务器本地的,因为其他服务器无法访问它。因此,我们可以将其存储在服务器之外的一个地方,那么Redis便是一个理想的场所。
第二是验证令牌的程序,就是在用户再次访问服务器时,我们获取到了它之前的身份标识,那么我们就要验证一下这个标识是否存在了。验证的过程很简单,我们从Redis中尝试获取一下就可以知道结果。
3、 如何利用Redis实现一个分布式锁?
加锁,通过“set...nx...”命令,将加锁、过期命令编排到一起,它们是原子操作了,可以避免死锁。
set key value nx ex seconds
解锁,确保原子操作和自己只能解自己的锁
# 解锁 if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
4、LRU算法和LFU算法理解
LRU算法实现方式:
- 通常会使用双向链表(Doubly Linked List)和哈希表(Hash Table)的结合来实现。哈希表用于快速查找数据项,而双向链表则用于保持数据项的顺序,即最近访问的数据项靠近链表头部,最久未访问的数据项靠近链表尾部。
- 当访问一个数据项时,如果它已经在链表中,则将其移动到链表头部;如果它不在链表中,则添加到链表头部,并可能需要在达到缓存容量上限时从链表尾部移除一个数据项。
LFU算法实现方式:
- LFU算法的实现比LRU更复杂,因为它需要跟踪每个数据项的访问频率。
- 一种常见的实现方式是使用哈希表来存储数据项及其对应的访问频率,同时使用双向链表(或最小堆)来维护频率的顺序。每个链表节点代表一个频率,而链表节点内部又使用哈希表或双向链表来存储具有相同访问频率的数据项。
- 当访问一个数据项时,需要更新其访问频率,并根据新的频率调整其在链表中的位置。如果达到缓存容量上限,则需要从具有最低访问频率的链表节点中移除一个数据项。
5、MQ处理消息失败了怎么办?
一般生产环境中,都会在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。核心业务队列,就是比如专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。
比如说要是第三方物流系统故障了,此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送,都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问,或者标志位处理失败!注意,这个步骤很重要。
一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。然后你会看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,能否请求的,不停的监视。一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。死信队列的使用,其实就是MQ在生产实践中非常重要的一环,也就是架构设计必须要考虑的。
6、Kafka为什么速度快?
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器,Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。
下面从数据写入和读取两方面分析,为什么Kafka速度这么快:
写入数据:
Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术,顺序写入和MMFile 。
一、顺序写入
磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存持平。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
而且Linux对于磁盘的读写优化也比较多,包括read-ahead和write-behind,磁盘缓存等。如果在内存做这些操作的时候,一个是JAVA对象的内存开销很大,另一个是随着堆内存数据的增多,JAVA的GC时间会变得很长,使用磁盘操作有以下几个好处:
-
磁盘顺序读写速度超过内存随机读写;
-
JVM的GC效率低,内存占用大。使用磁盘可以避免这一问题;
-
系统冷启动后,磁盘缓存依然可用。
下图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分):
这种方法有一个缺陷——没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据 。
二、Memory Mapped Files
即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘 ,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)
但也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);写入mmap之后立即返回Producer不调用flush叫异步 (async)。
读取数据:
一、基于sendfile实现Zero Copy
传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:
-
调用read函数,文件数据被copy到内核缓冲区;
-
read函数返回,文件数据从内核缓冲区copy到用户缓冲区;
-
write函数调用,将文件数据从用户缓冲区copy到内核与socket相关的缓冲区;
-
数据从socket缓冲区copy到相关协议引擎。
以上细节是传统read/write方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次copy操作:硬盘->内核buf->用户buf->socket相关缓冲区->协议引擎。而sendfile系统调用则提供了一种减少以上多次copy,提升文件传输性能的方法。
在内核版本2.1中,引入了sendfile系统调用,以简化网络上和两个本地文件之间的数据传输。sendfile的引入不仅减少了数据复制,还减少了上下文切换。运行流程如下:
-
sendfile系统调用,文件数据被copy至内核缓冲区;
-
再从内核缓冲区copy至内核中socket相关的缓冲区;
-
最后再socket相关的缓冲区copy到协议引擎。
相较传统read/write方式,2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区,再由user缓冲区到socket相关缓冲区的文件copy,而在内核版本2.4之后,文件描述符结果被改变,sendfile实现了更简单的方式,再次减少了一次copy操作。(即由内核缓冲区copy到内核中socket相关缓冲区的过程,使用偏移量替代,只是将引用传递给socket缓冲区,实际数据还是在内核缓冲区)
在Apache、Nginx、lighttpd等web服务器当中,都有一项sendfile相关的配置,使用sendfile可以大幅提升文件传输性能。Kafka把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候Kafka直接把文件发送给消费者,配合mmap作为文件读写方式,直接把它传给sendfile。
二、批量压缩
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
-
如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩;
-
Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩;
-
Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议。
总结:
Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优。读取数据的时候配合sendfile直接暴力输出。
7、说说ElasticSearch put的全过程
put过程主要分为三个阶段:
-
协调阶段:
Client 客户端选择一个 node 发送 put 请求,此时当前节点就是协调节点(coordinating node)。协调节点根据 document 的 id 进行路由,将请求转发给对应的 node。这个 node 上的是 primary shard 。
-
主要阶段:
对应的 primary shard 处理请求,写入数据 ,然后将数据同步到 replica shard。
-
primary shard 会验证传入的数据结构;
-
本地执行相关操作;
-
将操作转发给 replica shard。
当数据写入 primary shard 和 replica shard 成功后,路由节点返回响应给 Client。
-
-
副本阶段:
每个 replica shard 在转发后,会进行本地操作。
在写操作时,默认情况下,只需要 primary shard 处于活跃状态即可进行操作。在索引设置时可以设置这个属性:index.write.wait_for_active_shards。默认是 1,即 primary shard 写入成功即可返回。 如果设置为 all 则相当于 number_of_replicas+1 就是 primary shard 数量 + replica shard 数量。就是需要等待 primary shard 和 replica shard 都写入成功才算成功。可以通过索引设置动态覆盖此默认设置。
8、说说ElasticSearch的倒排索引
Elasticsearch 使用一种称为倒排索引的结构,它适用于快速的全文搜索。一个倒排索引由文档中所有不重复词的列表构成,对于其中每个词,有一个包含它的文档列表。
例如,假设我们有两个文档,每个文档的 content 域包含如下内容:
-
The quick brown fox jumped over the lazy dog
-
Quick brown foxes leap over lazy dogs in summer
为了创建倒排索引,我们首先将每个文档的 content 域拆分成单独的 词(我们称它为 词条 或 tokens ),创建一个包含所有不重复词条的排序列表,然后列出每个词条出现在哪个文档。结果如下所示:
现在,如果我们想搜索 quick brown ,我们只需要查找包含每个词条的文档:
两个文档都匹配,但是第一个文档比第二个匹配度更高。如果我们使用仅计算匹配词条数量的简单相似性算法 ,那么,我们可以说,对于我们查询的相关性来讲,第一个文档比第二个文档更佳。
但是,我们目前的倒排索引有一些问题:
-
Quick 和 quick 以独立的词条出现,然而用户可能认为它们是相同的词。
-
fox 和 foxes 非常相似, 就像 dog 和 dogs ;他们有相同的词根。
-
jumped 和 leap, 尽管没有相同的词根,但他们的意思很相近。他们是同义词。
使用前面的索引搜索 +Quick +fox 不会得到任何匹配文档。(记住,+ 前缀表明这个词必须存在。)只有同时出现 Quick 和 fox 的文档才满足这个查询条件,但是第一个文档包含 quick fox ,第二个文档包含 Quick foxes 。
我们的用户可以合理的期望两个文档与查询匹配。我们可以做的更好。如果我们将词条规范为标准模式,那么我们可以找到与用户搜索的词条不完全一致,但具有足够相关性的文档。例如:
-
Quick 可以小写化为 quick 。
-
foxes 可以 词干提取 --变为词根的格式-- 为 fox 。类似的, dogs 可以为提取为 dog 。
-
jumped 和 leap 是同义词,可以索引为相同的单词 jump 。
现在索引看上去像这样:
这还远远不够。我们搜索 +Quick +fox 仍然 会失败,因为在我们的索引中,已经没有 Quick 了。但是,如果我们对搜索的字符串使用与 content 域相同的标准化规则,会变成查询 +quick +fox ,这样两个文档都会匹配!
更多推荐
所有评论(0)