1 本地缓存简单介绍

    本地缓存因为少了网络传输环节,所以读取速度比分布式缓存要快一些。HashMap、ConcurrentHashMap也能用作本地缓存,但是因为缺少必要的过期机制、容量限制、数据淘汰策略,不太合适。本文介绍guava cache本地缓存的用法,重点介绍下两种过期机制的区别:refreshAfterWrite和expireAfterWrite。

2 guava cache

guava cache的过期机制:

  • refreshAfterWrite:在过期后,不清空缓存,在下一次获取时,只有一个线程去load最新值(该线程阻塞),其他线程仍返回旧值(不阻塞);
  • expireAfterWrite:在过期后,清空缓存,在下一次获取时,所有线程阻塞住,等待其中一个load到新值。

    两种过期策略在过期时都不会主动刷新缓存,只有下一次请求到来时才会刷新。

    示例:缓存过期时间设置为3s,启动3个线程,其中Thread-1、Thread-2并发获取缓存,每5s获取一次,Thread-3每秒查询一次缓存内容,缓存load时休息5s,模拟下load缓存的延时。

2.1 expireAfterWrite

package cache.guava;

import com.google.common.cache.*;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

/**
 * @Description guava本地缓存
 * @Author lilong
 * @Date 2019-03-14 19:55
 */
public class GuavaCacheService {
    private static LoadingCache<String, String> localCache = CacheBuilder.newBuilder()
            .maximumSize(5) // 缓存容量设为5,实际上只用到1个
            .expireAfterWrite(3, TimeUnit.SECONDS) // 一个线程load,其他线程阻塞等待
            .removalListener((removalNotification) -> { // 匿名内部类RemovalListener
                        System.out.println(Thread.currentThread().getName() + ":已移除");
                    }
            )
            .build(new CacheLoader<String, String>() {
                @Override
                public String load(String key) throws InterruptedException {
                    System.out.println(Thread.currentThread().getName() + ":开始移除key..." + key);
                    Thread.sleep(5000);
                    LocalDateTime dateTime = LocalDateTime.now();
                    String time = dateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
                    System.out.println(Thread.currentThread().getName() + ":移除成功key..." + key);
                    return "key_" + key + "_" + time;
                }
            });

    public static void main(String[] args) {
        new Thread(new GetCache(), "Thread-1").start();
        new Thread(new GetCache(), "Thread-2").start();

        new Thread(() -> {
            while (true) {
                System.out.println(localCache.asMap());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Thread-3").start();
    }

    static class GetCache implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    String val = localCache.get("a");
                    System.out.println(Thread.currentThread().getName() + ":" + val);
                    Thread.sleep(5000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行看下打印,可以看到:

1)缓存存活时间3s,到期自动删除;

2)在获取缓存新值时,所有线程阻塞等待、直到有一个线程load成功;

3)获取到的新值时间戳比旧的延后10s,而不是load的延时5s,还没搞懂为什么

2.2 过期机制:refreshAfterWrite

把上面代码的expireAfterWrite换成refreshAfterWrite,打印日志,看到

1)缓存过期后不会清除;

2)在一个线程load阻塞住时,其他线程仍然能返回旧值,不会阻塞住。

​3)获取到的新值时间戳比旧的延后也是10s,而不是load的延时5s

3 解决分布式环境下本地缓存不一致问题

(1)kafka:

  • 生产者:每一台机器都是一个生产者,只要本地缓存有变化,就把变化的缓存投递到kafka的一个topic里面。可以投递全量缓存,也可以投递增量缓存。
  • 消费者:每一台机器也都是一个消费者,订阅上面的topic,重点在于,每个消费者都定义不同的groupId,这样就实现了消息的广播,只要有一台机器缓存变了,所有机器都能收到。

(2)zk:

    每台机器注册到zk上面去,类似dubbo服务注册的方式,所有机器都注册一个watch,一有变化全部通知,然后大家再去那一台机器取缓存。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐