java 從零開始手寫 redis(六)redis AOF 持久化原理詳解及實現

前言

java從零手寫實現redis(一)如何實現固定大小的緩存?java

java從零手寫實現redis(三)redis expire 過時原理git

java從零手寫實現redis(三)內存數據如何重啓不丟失?github

java從零手寫實現redis(四)添加監聽器redis

java從零手寫實現redis(五)過時策略的另外一種實現思路spring

咱們前面簡單實現了 redis 的幾個特性,java從零手寫實現redis(三)內存數據如何重啓不丟失? 中實現了相似 redis 的 RDB 模式。json

redis aof 基礎

Redis AOF 持久化詳解api

AOF 的一些我的理解

爲何選擇 AOF?

AOF 模式的性能特別好,有多好呢?緩存

用過 kafka 的同窗確定知道,kafka 也用到了順序寫這個特性。安全

順序寫添加文件內容,避免了文件 IO 的隨機寫問題,性能基本能夠和內存媲美。多線程

AOF 的實時性更好,這個是相對於 RDB 模式而言的。

咱們原來使用 RDB 模式,將緩存內容所有持久化,這個是比較耗時的動做,通常是幾分鐘持久化一次。

AOF 模式主要是針對修改內容的指令,而後將全部的指令順序添加到文件中。這樣的話,實時性會好不少,能夠提高到秒級別,甚至秒級別。

AOF 的吞吐量

AOF 模式能夠每次操做都進行持久化,可是這樣會致使吞吐量大大降低。

提高吞吐量最經常使用的方式就是批量,這個 kafka 中也是相似的,好比咱們能夠 1s 持久化一次,將 1s 內的操做所有放入 buffer 中。

這裏其實就是一個 trade-off 問題,實時性與吞吐量的平衡藝術。

實際業務中,1s 的偏差通常都是能夠接受的,因此這個也是業界比較承認的方式。

AOF 的異步+多線程

kafka 中全部的操做實際上都是異步+回調的方式實現的。

異步+多線程,確實能夠提高操做的性能。

固然 redis 6 之前,其實一直是單線程的。那爲何性能依然這麼好呢?

其實多線程也有代價,那就是線程上下文的切換是須要耗時的,保持併發的安全問題,也須要加鎖,從而下降性能。

因此這裏要考慮異步的收益,與付出的耗時是否成正比的問題。

AOF 的落盤

咱們 AOF 與 RDB 模式,歸根結底都是基於操做系統的文件系統作持久化的。

對於開發者而言,可能就是調用一個 api 就實現了,可是實際持久化落盤的動做並不見得就是一步完成的。

文件系統爲了提高吞吐量,也會採用相似 buffer 的方式。這突然有一點俄羅斯套娃的味道。

可是優秀的設計老是類似的,好比說緩存從 cpu 的設計中就有 L1/L2 等等,思路是一致的。

阿里的不少開源技術,都會針對操做系統的落盤作進一步的優化,這個咱們後續作深刻學習。

AOF 的缺陷

大道缺一,沒有銀彈。

AOF 千好萬好,和 RDB 對比也存在一個缺陷,那就是指令

java 實現

接口

接口和 rdb 的保持一致

/**
 * 持久化緩存接口
 * @author binbin.hou
 * @since 0.0.7
 * @param <K> key
 * @param <V> value
 */
public interface ICachePersist<K, V> {

    /**
     * 持久化緩存信息
     * @param cache 緩存
     * @since 0.0.7
     */
    void persist(final ICache<K, V> cache);

}

註解定義

爲了和耗時統計,刷新等特性保持一致,對於操做類的動做才添加到文件中(append to file)咱們也基於註解屬性來指定,而不是固定寫死在代碼中,便於後期拓展調整。

/**
 * 緩存攔截器
 * @author binbin.hou
 * @since 0.0.5
 */
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CacheInterceptor {

    /**
     * 操做是否須要 append to file,默認爲 false
     * 主要針對 cache 內容有變動的操做,不包括查詢操做。
     * 包括刪除,添加,過時等操做。
     * @return 是否
     * @since 0.0.10
     */
    boolean aof() default false;

}

咱們在原來的 @CacheInterceptor 註解中添加 aof 屬性,用於指定是否對操做開啓 aof 模式。

指定 aof 模式的方法

咱們在會對數據形成變動的方法上指定這個註解屬性:

過時操做

相似於 spring 的事務攔截器,咱們使用代理類調用 expireAt。

expire 方法就不須要添加 aof 攔截了。

/**
 * 設置過時時間
 * @param key         key
 * @param timeInMills 毫秒時間以後過時
 * @return this
 */
@Override
@CacheInterceptor
public ICache<K, V> expire(K key, long timeInMills) {
    long expireTime = System.currentTimeMillis() + timeInMills;
    // 使用代理調用
    Cache<K,V> cachePoxy = (Cache<K, V>) CacheProxy.getProxy(this);
    return cachePoxy.expireAt(key, expireTime);
}

/**
 * 指定過時信息
 * @param key key
 * @param timeInMills 時間戳
 * @return this
 */
@Override
@CacheInterceptor(aof = true)
public ICache<K, V> expireAt(K key, long timeInMills) {
    this.expire.expire(key, timeInMills);
    return this;
}

變動操做

@Override
@CacheInterceptor(aof = true)
public V put(K key, V value) {
    //1.1 嘗試驅除
    CacheEvictContext<K,V> context = new CacheEvictContext<>();
    context.key(key).size(sizeLimit).cache(this);
    boolean evictResult = evict.evict(context);
    if(evictResult) {
        // 執行淘汰監聽器
        ICacheRemoveListenerContext<K,V> removeListenerContext = CacheRemoveListenerContext.<K,V>newInstance().key(key).value(value).type(CacheRemoveType.EVICT.code());
        for(ICacheRemoveListener<K,V> listener : this.removeListeners) {
            listener.listen(removeListenerContext);
        }
    }
    //2. 判斷驅除後的信息
    if(isSizeLimit()) {
        throw new CacheRuntimeException("當前隊列已滿,數據添加失敗!");
    }
    //3. 執行添加
    return map.put(key, value);
}

@Override
@CacheInterceptor(aof = true)
public V remove(Object key) {
    return map.remove(key);
}

@Override
@CacheInterceptor(aof = true)
public void putAll(Map<? extends K, ? extends V> m) {
    map.putAll(m);
}

@Override
@CacheInterceptor(refresh = true, aof = true)
public void clear() {
    map.clear();
}

AOF 持久化攔截實現

持久化對象定義

/**
 * AOF 持久化明細
 * @author binbin.hou
 * @since 0.0.10
 */
public class PersistAofEntry {

    /**
     * 參數信息
     * @since 0.0.10
     */
    private Object[] params;

    /**
     * 方法名稱
     * @since 0.0.10
     */
    private String methodName;

    //getter & setter &toString
}

這裏咱們只須要方法名,和參數對象。

暫時實現的簡單一些便可。

持久化攔截器

咱們定義攔截器,當 cache 中定義的持久化類爲 CachePersistAof 時,將操做的信息放入到 CachePersistAof 的 buffer 列表中。

public class CacheInterceptorAof<K,V> implements ICacheInterceptor<K, V> {

    private static final Log log = LogFactory.getLog(CacheInterceptorAof.class);

    @Override
    public void before(ICacheInterceptorContext<K,V> context) {
    }

    @Override
    public void after(ICacheInterceptorContext<K,V> context) {
        // 持久化類
        ICache<K,V> cache = context.cache();
        ICachePersist<K,V> persist = cache.persist();

        if(persist instanceof CachePersistAof) {
            CachePersistAof<K,V> cachePersistAof = (CachePersistAof<K,V>) persist;

            String methodName = context.method().getName();
            PersistAofEntry aofEntry = PersistAofEntry.newInstance();
            aofEntry.setMethodName(methodName);
            aofEntry.setParams(context.params());

            String json = JSON.toJSONString(aofEntry);

            // 直接持久化
            log.debug("AOF 開始追加文件內容:{}", json);
            cachePersistAof.append(json);
            log.debug("AOF 完成追加文件內容:{}", json);
        }
    }

}

攔截器調用

當 AOF 的註解屬性爲 true 時,調用上述攔截器便可。

這裏爲了不浪費,只有當持久化類爲 AOF 模式時,才進行調用。

//3. AOF 追加
final ICachePersist cachePersist = cache.persist();
if(cacheInterceptor.aof() && (cachePersist instanceof CachePersistAof)) {
    if(before) {
        persistInterceptors.before(interceptorContext);
    } else {
        persistInterceptors.after(interceptorContext);
    }
}

AOF持久化實現

這裏的 AOF 模式和之前的 RDB 持久化類只是不一樣的模式,實際上兩者是相同的接口。

接口

這裏咱們統必定義了不一樣的持久化類的時間,便於 RDB 與 AOF 不一樣任務的不一樣時間間隔觸發。

public interface ICachePersist<K, V> {

    /**
     * 持久化緩存信息
     * @param cache 緩存
     * @since 0.0.7
     */
    void persist(final ICache<K, V> cache);

    /**
     * 延遲時間
     * @return 延遲
     * @since 0.0.10
     */
    long delay();

    /**
     * 時間間隔
     * @return 間隔
     * @since 0.0.10
     */
    long period();

    /**
     * 時間單位
     * @return 時間單位
     * @since 0.0.10
     */
    TimeUnit timeUnit();
}

持久化類實現

實現一個 Buffer 列表,用於每次攔截器直接順序添加。

持久化的實現也比較簡單,追加到文件以後,直接清空 buffer 列表便可。

/**
 * 緩存持久化-AOF 持久化模式
 * @author binbin.hou
 * @since 0.0.10
 */
public class CachePersistAof<K,V> extends CachePersistAdaptor<K,V> {

    private static final Log log = LogFactory.getLog(CachePersistAof.class);

    /**
     * 緩存列表
     * @since 0.0.10
     */
    private final List<String> bufferList = new ArrayList<>();

    /**
     * 數據持久化路徑
     * @since 0.0.10
     */
    private final String dbPath;

    public CachePersistAof(String dbPath) {
        this.dbPath = dbPath;
    }

    /**
     * 持久化
     * key長度 key+value
     * 第一個空格,獲取 key 的長度,而後截取
     * @param cache 緩存
     */
    @Override
    public void persist(ICache<K, V> cache) {
        log.info("開始 AOF 持久化到文件");
        // 1. 建立文件
        if(!FileUtil.exists(dbPath)) {
            FileUtil.createFile(dbPath);
        }
        // 2. 持久化追加到文件中
        FileUtil.append(dbPath, bufferList);

        // 3. 清空 buffer 列表
        bufferList.clear();
        log.info("完成 AOF 持久化到文件");
    }

    @Override
    public long delay() {
        return 1;
    }

    @Override
    public long period() {
        return 1;
    }

    @Override
    public TimeUnit timeUnit() {
        return TimeUnit.SECONDS;
    }

    /**
     * 添加文件內容到 buffer 列表中
     * @param json json 信息
     * @since 0.0.10
     */
    public void append(final String json) {
        if(StringUtil.isNotEmpty(json)) {
            bufferList.add(json);
        }
    }

}

持久化測試

測試代碼

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .persist(CachePersists.<String, String>aof("1.aof"))
        .build();
cache.put("1", "1");
cache.expire("1", 10);
cache.remove("2");
TimeUnit.SECONDS.sleep(1);

測試日誌

expire 實際上調用的是 expireAt。

[DEBUG] [2020-10-02 12:20:41.979] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 開始追加文件內容:{"methodName":"put","params":["1","1"]}
[DEBUG] [2020-10-02 12:20:41.980] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件內容:{"methodName":"put","params":["1","1"]}
[DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 開始追加文件內容:{"methodName":"expireAt","params":["1",1601612441990]}
[DEBUG] [2020-10-02 12:20:41.982] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件內容:{"methodName":"expireAt","params":["1",1601612441990]}
[DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 開始追加文件內容:{"methodName":"remove","params":["2"]}
[DEBUG] [2020-10-02 12:20:41.984] [main] [c.g.h.c.c.s.i.a.CacheInterceptorAof.after] - AOF 完成追加文件內容:{"methodName":"remove","params":["2"]}
[DEBUG] [2020-10-02 12:20:42.088] [pool-1-thread-1] [c.g.h.c.c.s.l.r.CacheRemoveListener.listen] - Remove key: 1, value: 1, type: expire
[INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 開始持久化緩存信息
[INFO] [2020-10-02 12:20:42.789] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 開始 AOF 持久化到文件
[INFO] [2020-10-02 12:20:42.798] [pool-2-thread-1] [c.g.h.c.c.s.p.CachePersistAof.persist] - 完成 AOF 持久化到文件
[INFO] [2020-10-02 12:20:42.799] [pool-2-thread-1] [c.g.h.c.c.s.p.InnerCachePersist.run] - 完成持久化緩存信息

文件內容

1.aof 的文件內容以下

{"methodName":"put","params":["1","1"]}
{"methodName":"expireAt","params":["1",1601612441990]}
{"methodName":"remove","params":["2"]}

將每一次的操做,簡單的存儲到文件中。

AOF 加載實現

加載

相似於 RDB 的加載模式,aof 的加載模式也是相似的。

咱們須要根據文件的內容,還原之前的緩存的內容。

實現思路:遍歷文件內容,反射調用原來的方法。

代碼實現

解析文件

@Override
public void load(ICache<K, V> cache) {
    List<String> lines = FileUtil.readAllLines(dbPath);
    log.info("[load] 開始處理 path: {}", dbPath);
    if(CollectionUtil.isEmpty(lines)) {
        log.info("[load] path: {} 文件內容爲空,直接返回", dbPath);
        return;
    }

    for(String line : lines) {
        if(StringUtil.isEmpty(line)) {
            continue;
        }
        // 執行
        // 簡單的類型還行,複雜的這種反序列化會失敗
        PersistAofEntry entry = JSON.parseObject(line, PersistAofEntry.class);
        final String methodName = entry.getMethodName();
        final Object[] objects = entry.getParams();
        final Method method = METHOD_MAP.get(methodName);
        // 反射調用
        ReflectMethodUtil.invoke(cache, method, objects);
    }
}

方法映射的預加載

Method 反射是固定的,爲了提高性能,咱們作一下預處理。

/**
 * 方法緩存
 *
 * 暫時比較簡單,直接經過方法判斷便可,沒必要引入參數類型增長複雜度。
 * @since 0.0.10
 */
private static final Map<String, Method> METHOD_MAP = new HashMap<>();
static {
    Method[] methods = Cache.class.getMethods();
    for(Method method : methods){
        CacheInterceptor cacheInterceptor = method.getAnnotation(CacheInterceptor.class);
        if(cacheInterceptor != null) {
            // 暫時
            if(cacheInterceptor.aof()) {
                String methodName = method.getName();
                METHOD_MAP.put(methodName, method);
            }
        }
    }
}

測試

文件內容

  • default.aof
{"methodName":"put","params":["1","1"]}

測試

ICache<String, String> cache = CacheBs.<String,String>newInstance()
        .load(CacheLoads.<String, String>aof("default.aof"))
        .build();

Assert.assertEquals(1, cache.size());
System.out.println(cache.keySet());

直接將 default.aof 文件加載到 cache 緩存中。

小結

redis 的文件持久化,實際上更加豐富。

能夠支持 rdb 和 aof 兩種模式混合使用。

aof 模式的文件體積會很是大,redis 爲了解決這個問題,會定時對命令進行壓縮處理。

能夠理解爲 aof 就是一個操做流水錶,咱們實際上關心的只是一個終態,不論中間通過了多少步驟,咱們只關心最後的值。

文中主要講述了思路,實現部分由於篇幅限制,沒有所有貼出來。

開源地址: https://github.com/houbb/cache

以爲本文對你有幫助的話,歡迎點贊評論收藏關注一波~

你的鼓勵,是我最大的動力~