RxJS 缓存高级教程(热血教师美国版免费)

网友投稿 232 2022-06-15


在开发 Web 应用程序时,性能一般都是出于最高优先级的。对于 Angular 项目,我们有很多途径去提升程序性能,例如摇树优化(tree-shaking)、AoT(ahead-of-time 编译)、模块懒加载(lazy loading)以及缓存。为了能够更好地概览全局,以便提高 Angular 应用程序的性能,我们强烈推荐使用 Minko Gechev 的 Angular 性能检查表 。本文主要聚焦于 缓存 。

事实上,缓存是提升网站性能的最有效的方法之一,尤其是当用户出于受限网络带宽或慢速网络的情况。

有很多种缓存数据或资源的方法。静态资源通常使用标准的浏览器缓存或 Service Worker 进行缓存。当然,Service Worker 也可以缓存 API 请求,但它们更适合缓存图片、HTML、JS 或 CSS 等文件。缓存系统数据,我们则会选用另外的机制。

不管我们选择怎样的机制,缓存都会 改善系统的响应性 , 降低网络消耗 , 在网络中断的情况下依然能够使用内容 。换句话说,当内容在更接近用户的地方被缓存,例如就在客户端,请求就不会引起另外的网络活动;缓存的数据可以被更快返回,因为我们不需要进行完整的网络周期。

这篇文章我们将使用 RxJS 以及 Angular 提供的各种工具实现一种高级的缓存机制。

动机

一直以来,都有一个疑问:如何在一个频繁使用 Observable 对象的 Angular 程序中缓存数据。很多人都知道如何使用 Promise 缓存数据,但是对如何在函数式的响应式编程中缓存数据束手无策。因为后者的复杂性(庞大的 API)、完全不同的使用方式(从命令式编程到指令式编程)以及许多概念。因此,将基于 Promise 的缓存系统移植到 Observable 是非常困难的,尤其是还需要实现一些高级功能的时候。

Angular 应用程序通常使用由 HttpClientModule 提供的 HTTPClient 实现 HTTP 请求。它的所有 API 都是基于 Observable 的。这意味着, HTTPClient 的函数,例如 get 、 post 、 put 和 delete 都返回一个 Observable 。 Observable 天生是懒的,只有当我们调用了 subscribe 函数之后才会发送请求。但是,对同一个 Observable 多次调用 subscribe 函数,会一遍一遍地创建源 Observable 对象,也就是为每一次定于执行一次请求。我们将这种模式成为冷模式(cold)。

如果你对此完全不了解,可以阅读我们的另外一篇文章 《Observable 的冷模式和热模式》 。

这种行为使得实现 Observable 缓存机制变得有些棘手。简单的实现通常需要大量固定模式的代码,而且最终可能需要绕过 RxJS。这是一种解决思路,但如果我们依然希望使用 Observable 的强大功能,这种实现就不值得推荐。简单来说,我们并不想给法拉利安装一个摩托车引擎,对吧?

需求

在我们深入代码之前,首先定义好我们的高级缓存机制的需求。

我们要开发一个名为 笑话世界 的应用。这是一个简单的 app,随机显示给定分类里面的笑话。为了尽可能简单,我们只给出一个分类。

这个 app 有三个组件: AppComponent 、 DashboardComponent 和 JokeListComponent 。

AppComponent 是程序入口,显示一个工具栏和一个安装当前路由状态填充的

DashboardComponent 只用来显示分类列表。我们可以从这里导航到 JokeListComponent 。 JokeListComponent 负责将笑话列表显示到屏幕。

笑话由 Angular 的 HttpClient 服务从服务器获取。为保持组件的响应、解耦,我们要创建一个 JokeService ,来帮助我们获取数据。组件只需要注入这个服务,通过其公开的 API 访问数据即可。

以上所有都是我们的系统架构,并没有引入缓存。

当我们从主页到列表视图时,我们可以从缓存请求数据,而不是每次都从服务器获取。缓存中的数据每 10 秒自动更新。

当然,每 10 秒获取数据并不是每个产品都需要遵守的固定准则,我们可能需要更复杂的实现来更新缓存(例如使用 web socket 推送更新)。但是,现在我们可以尽可能保持简单,集中精力解决缓存的问题。

无论如何,我们都希望收到某种更新的通知。就我们的程序而言,我们不希望自动更新 UI( JokeListComponent )的数据,而是用户要求 UI 更新时才去更新缓存。为什么?想象下这样的场景:用户正在阅读一个笑话,因为数据的自动更新,所有笑话突然都消失了。这无疑非常令人反感,是一种非常差的用户体验。因此,我们的用户在有新数据的时候会收到通知。

为了更有趣一点,我们还希望用户能够强制刷新缓存。这与仅仅更新 UI 不同,因为强制刷新意味着要从服务器请求数据、更新缓存、然后更新 UI。

现在我们总结一下我们想要干什么:

程序有两个组件,当从组件 A 导航到组件 B 时,组件 B 的数据最好从缓存获取,而不是每次都从服务器获取

每 10 秒更新缓存

UI 中的数据并不会自动更新,而是由用户强制更新

用户可以强制刷新,从服务器重新获取数据、更新缓存和 UI

下面是我们即将构建的 app 预览图:

实现基本的缓存

我们从一个简单的实现开始,逐渐过渡到最终的全功能版本。

第一步是创建一个新的服务。

然后,我们添加两个接口,一个用来描述 Joke 的属性,另一个用来描述 HTTP 请求的返回。这样的接口会让我们的程序更符合 TypeScript 的要求,同时也更方便开发。

export interface Joke {

  id: number;

  joke: string;

  categories: Array;

}

 

export interface JokeResponse {

  type: string;

  value: Array;

}

下面我们实现 JokeService 。我们不想透露数据究竟是从缓存获取的,还是从服务器获取的,因此,我们只提供一个返回值类型为 Observable 的 jokes 属性,用于获取笑话列表。

为了执行 HTTP 请求,我们需要为我们的服务注入 HttpClient 。

下面是 JokeService 的代码框架:

import { Injectable } from '@angular/core';

import { HttpClient } from '@angular/common/http';

 

@Injectable()

export class JokeService {

 

  constructor(private http: HttpClient) { }

 

  get jokes() {

    ...

  }

}

下面,我们实现一个私有的 requestJokes() 函数,通过 HttpClient 的 GET 请求获取笑话列表。

import { map } from 'rxjs/operators';

 

@Injectable()

export class JokeService {

 

  constructor(private http: HttpClient) { }

 

  get jokes() {

    ...

  }

 

  private requestJokes() {

    return this.http.get(API_ENDPOINT).pipe(

      map(response => response.value)

    );

  }

}

现在,我们有了实现获取笑话的函数的一切准备。

一个显而易见的实现是,直接返回 this.requestJokes() ,但这无法满足我们的需要。我们知道,所有 HttpClient 暴露的函数,例如 get() ,都是返回一个冷 Observable 。这意味着每个订阅者都会重新出发完整的数据流,从而带来额外的 HTTP 请求。毕竟,缓存的意义就在于提高系统的加载时间,将网络请求限制到最低的水平。

所以,我们想让我们的流变成热的。不仅仅如此,每一个新的订阅者应该获取到最近的缓存值。事实上,有一个很方便的操作符可以实现这一点: shareReplay 。这个操作符返回一个 Observable 对象。该对象会在底层共享一个订阅,也就是 this.requestJokes() 返回的那个 Observable 。

另外, shareReplay 接受一个可选参数 bufferSize ,对于我们的用例非常有用。 bufferSize 决定了重现缓存(replay buffer)的最大元素数,也就是被缓存、能够重现给每一个订阅者的元素数。在我们的场景中,我们只需要最近的一个值,因此将 bufferSize 设置为 1。

我们看一下实际代码,看看我们刚刚学到了什么:

import { Observable } from 'rxjs/Observable';

import { shareReplay, map } from 'rxjs/operators';

 

const API_ENDPOINT = 'https://api.icndb.com/jokes/random/5?limitTo=[nerdy]';

const CACHE_SIZE = 1;

 

@Injectable()

export class JokeService {

  private cache$: Observable>;

 

  constructor(private http: HttpClient) { }

 

  get jokes() {

    if (!this.cache$) {

      this.cache$ = this.requestJokes().pipe(

        shareReplay(CACHE_SIZE)

      );

    }

 

    return this.cache$;

  }

 

  private requestJokes() {

    return this.http.get(API_ENDPOINT).pipe(

      map(response => response.value)

    );

  }

}

好了,我们已经讨论过上面的大部分代码。但等等,私有的 cache$ 属性以及访问函数里面的 if 语句是什么意思?答案很简单。如果我们直接返回 this.requestJokes().pipe(shareReplay(CACHE_SIZE)) ,那么,每一个订阅者都会创建一个新的缓存实例。但是,我们想要所有订阅者共享一个实例。因此,我们将这个实例保持在私有的 cache$ 属性中,在第一次调用的时候初始化这个属性。这样,所有订阅者都会访问到这一个共享实例,而不是每次创建一个新的对象。

我们看一下上面代码实现的更直观的表示:

上图是一个 时序图 ,它描述了场景中涉及的对象,请求一个笑话列表,以及对象之间交换信息的时序。现在我们暂停一下,了解下发生了什么。

我们从导航到列表组件的仪表盘开始。

组件初始化之后,Angular 会调用 ngOnInit 生命周期钩子。在这里,我们调用 JokeService 暴露的访问器 jokes 请求笑话列表。由于这是我们第一次请求数据,所以缓存是空的,并且没有初始化,这意味着 JokeService.cache$ 是 undefined 。我们在访问器内部调用了 requestJokes() 。这会返回我们一个 Observable 对象,其数据来自服务器。同时,我们使用 shareReplay 运算符来获得所期望的行为。

shareReplay 操作符会在原始源于未来所有的订阅者之间自动创建一个 ReplaySubject 。只要订阅者数量从零变为一,它就会将这个 Subject 关联到底层数据源,然后广播其所有值。未来所有订阅者都会关联到这个 Subject ,所以实际上只有一个订阅关联到底层的那个冷 Observable 。这被称为 多播 (multicasting),定义了我们的简单缓存的基础。

一旦数据从服务器获取到,就会被缓存。

注意,在时序图中, Cache 是一个独立的对象,目的是用来说明那个从消费者(订阅者)到底层源(HTTP 请求)之间创建的 ReplaySubject 。

下一次我们为列表中组件请求数据的时候,我们的缓存就会发送最近的值给消费者。这时候并不会有额外的 HTTP 调用。

很简单,对吧?

为了真正的区别开来,我们更进一步,从 Observable 的层次看看缓存是如何工作的。现在我们使用 弹子图 (marble diagram)看看流:

弹子图非常清晰地显示出,底层 Observable 只有一个订阅,所有消费者都订阅到这个共享的 Observable ,也就是这个 ReplaySubject 对象。我们也能够看出,只有第一个订阅者触发了 HTTP 调用,其余的都是直接获取重现的值。

最后,我们看一下 JokeListComponent 是如何显示数据的。首先,注入 JokeService 对象。之后,在 ngOnInit 中,使用服务对象暴露的访问器初始化一个 jokes$ 属性。这个访问器会返回一个 Array 类型的 Observable 对象,这正是我们所需要的。

@Component({

  ...

})

export class JokeListComponent implements OnInit {

  jokes$: Observable>;

 

  constructor(private jokeService: JokeService) { }

 

  ngOnInit() {

    this.jokes$ = this.jokeService.jokes;

  }

 

  ...

}

注意,我们并没有马上订阅 jokes$ ,而是在模板中使用了 async 管道,因为这个管道充满了奇迹。好奇吗?请阅读文章 《了解关于 AsyncPipe 你不知道的三件事情》 。

...

太棒了!这就是我们实际使用的简单缓存。为了验证是不是只有一次请求,打开 Chrome 的 DevTools,点击 Network 选项卡,选择 XHR。开启仪表盘,导航到列表视图,然后再导航回来。

自动更新

现在我们用几行代码构建了一个简单的缓存机制。事实上,很多工作都是由 shareReplay 操作符完成的。这个操作符会实现缓存和重现大多数数据值。

当数据不会在后台更新时,这就已经很好地工作了。那么,如果数据每隔几分钟就会改变了呢?我们当然不应该强制用户为了从服务器获取最新数据必须要刷新这个页面。

如果我们的缓存每隔 10 秒钟就会在后台更新呢?是不是很酷?肯定的!作为用户,我们不需要重新加载页面;数据改变后,UI 会随之更新。再说一遍,在真实的应用中,我们一般不会主动拉取数据,而是要服务器 推送 通知。对于我们的小示例程序而言,能够 每隔 10 秒刷新 一次就很好了。

这种实现很简单。简而言之,我们需要创建一个 Observable ,根据给定的时间间隔发出一系列值。或者简单来说,就是我们需要每 X 毫秒产生一个值。对达到这一目的,我们有很多种实现。

第一个选择是使用 interval 操作符。这个操作符要求一个可选参数,定义了每次发送值得时间。考虑下面的代码:

import { interval } from 'rxjs/observable/interval';

 

interval(10000).subscribe(console.log);

这里我们创建了一个能够发送无限整数序列的 Observable 对象。该对象每隔 10 秒会发出一个整数值。这意味着第一个值也会在延迟给定时间之后才会发出。为了更好理解这一行为,我们可以看一下 interval 的弹子图。

对,就像我们想的那样。这一个值会“延迟”,这不是我们想要的。为什么?因为如果我们从仪表盘导航到列表组件,希望阅读一些有趣的笑话,我们得等待 10 秒钟,才会从服务器请求数据,然后显示到屏幕。

我们通过引入另外一个操作符来解决这一问题。这个操作符是 startWith(value) ,可以先发出一个给定值作为初始值。但我们可以做得更好!

我会告诉你,其实有一个操作符可以在给定的一段是时间之后(初识延迟)按照特定时间(正常间隔)发出一个值的序列。这就是 timer 。

可视化时间!

酷!但这真的解决我们的问题了吗?是的。如果我们将初始值设置为 0,将间隔时间设置为 10 秒,我们就会有类似 interval(10000).pipe(startWith(0)) 的行为,但只用了一个操作符。

让我们把这种实现带到我们的缓存机制中去吧。

我们需要建立一个 定时器 ,每次触发都发送一个 HTTP 请求,去服务器获取新的数据。也就是说,每一次定时器触发,我们需要使用 switchMap 转换到一个 Observable 对象,在订阅时获取新的笑话列表。使用 switchMap 还有一个额外的好处是,我们可以避免竞争条件。这是这个操作符天生就有的特点,它会取消 Observable 之前的订阅,仅仅为最新的对象发出值。

我们的缓存的剩余部分不需要改变,意味着我们的流还是多播的,所有的订阅者共享一个底层源。

再说一遍, shareReplay 天生就会将新的值广播给所有订阅者,并且将最近的值发送给新的订阅者。

正如我们在弹子图看到的那样, timer 每 10 秒发出一个值。每一个值都会转换成一个内部 Observable 对象,去获取我们所需要的数据。因为我们使用了 switchMap ,我们避免了竞争条件,因此消费者只会接收到值 1 和 3。内部 Observable 对象发出的第二个值会被“跳过”,因为新值到了的时候已经取消了。

让我们利用学到的知识,更新下 JokeService 。

import { timer } from 'rxjs/observable/timer';

import { switchMap, shareReplay } from 'rxjs/operators';

 

const REFRESH_INTERVAL = 10000;

 

@Injectable()

export class JokeService {

  private cache$: Observable>;

 

  constructor(private http: HttpClient) { }

 

  get jokes() {

    if (!this.cache$) {

      // Set up timer that ticks every X milliseconds

      const timer$ = timer(0, REFRESH_INTERVAL);

 

      // For each tick make an http request to fetch new data

      this.cache$ = timer$.pipe(

        switchMap(_ => this.requestJokes()),

        shareReplay(CACHE_SIZE)

      );

    }

 

    return this.cache$;

  }

 

  ...

}

厉害!想自己试试吗?下面是一个现实的示例。从仪表盘开始,到列表组件,然后看看有什么魔法出现。等几秒钟,就可以看到更新的动作。记住,缓存每 10 秒刷新一次,但通过修改 REFRESH_INTERVAL 的值就可以改变这一间隔。

发送更新通知

让我们回顾一下目前所构建的内容。

当我们通过 JokeService 请求数据时,我们希望数据从缓存获得,而不是每次都去请求服务器。缓存的底层数据每 10 秒刷新一次。刷新之后,数据会推送给组件,组件自动更新。

这有点问题。想象一下,如果我们是一个用户,正在阅读某个笑话,突然间这个笑话消失了,因为 UI 自动更新了。这无疑非常讨厌,是很坏的用户体验。

因此,我们的用户应该在有新数据时获得 通知 。换句话说,我们希望用户自己去更新 UI。

事实证明,我们不需要修改服务来实现这个功能。这个逻辑很简单。毕竟,我们的服务不应该关心发送通知的问题,视图应该负责何时怎样更新屏幕的数据。

首先,我们需要给用户展示一个 初始值 ,否则的话在第一次缓存更新之前,屏幕就是空白的。我们马上就会看到原因。设置一个初始化的流与调用访问器函数一样简单。另外,既然我们只关心第一次的值,我们可以使用 take 操作符。

为了逻辑上的可读性,我们创建一个副主函数 getDataOnce() 。

import { take } from 'rxjs/operators';

 

@Component({

  ...

})

export class JokeListComponent implements OnInit {

  ...

  ngOnInit() {

    const initialJokes$ = this.getDataOnce();

    ...

  }

 

  getDataOnce() {

    return this.jokeService.jokes.pipe(take(1));

  }

  ...

}

从我们的需求可以知道,我们只想在用户真正需要更新 UI 的时候出去更新,而不是自动更新。你会问,用户如何要求更新界面?当用户点击了“更新”按钮时,我们就知道用户想要更新 UI。这个按钮和通知一起显示。现在,我们不去关心通知,把注意力集中在点击按钮之后的更新逻辑上面。

为了实现这一目的,我们需要从 DOM 事件(也就是按钮点击的事件)创建一个 Observable 对象。有很多种方法可以实现,但最常用的是使用 Subject 对象作为模板与组件视图逻辑之间的 桥梁 。简单来说, Subject 即是 Observer 又是 Observable 。 Observable 定义了数据流,能够发出数据; Observer 则能够订阅 Observable 并且接收数据。

好消息是,我们可以在模板的事件绑定中直接使用 Subject ,然后在事件发出时调用其 next 函数。这会产生一个特定值,广播给所有监听这个值的 Observer 对象。注意,如果 Subject 是 void 类型的,我们可以简单地忽略这个值。事实上,我们的用例就是这样子的。

让我们继续,实例化一个 Subject 对象吧。

import { Subject } from 'rxjs/Subject';

 

@Component({

  ...

})

export class JokeListComponent implements OnInit {

  update$ = new Subject();

  ...

}

下面继续,将这个值用到模板中。

  There's new data available. Click to reload the data.

  

注意我们在

这已经可以正常工作了,但只有当我们回到仪表盘,再重新进入列表视图时才是正常的。这当然不是我们所需要的。我们想要强制更新缓存数据的时候,UI 能够立即更新。

还记得我们实现了 updates$ 流,当我们点击“更新”时,会从缓存获取最新的数据?我们就需要类似这种的行为,所以我们需要扩展一下这个流。这意味着,我们需要将 update$ 和 forceReload$ 合并 起来,因为这两个流都需要更新 UI。

import { Subject } from 'rxjs/Subject';

import { merge } from 'rxjs/observable/merge';

import { mergeMap } from 'rxjs/operators';

 

@Component({

  ...

})

export class JokeListComponent implements OnInit {

  update$ = new Subject();

  forceReload$ = new Subject();

  ...

 

  ngOnInit() {

    ...

    const updates$ = merge(this.update$, this.forceReload$).pipe(

      mergeMap(() => this.getDataOnce())

    );

    ...

  }

  ...

}

这是不是很简单?是的,但我们还没完成。事实上,我们刚刚“打断”了我们的通知。这本可以正常工作,直到我们点击了“获取新笑话”。屏幕上的数据会更新,缓存中的也是,但我们等待 10 秒之后,并没有弹出通知。问题出在强制更新缓存会完成缓存实例,这意味着组件再也不会收到值。简单来说,通知流( initialNotifications$ )死了。这很不想,怎么解决这个问题呢?

很简单!我们可以监听 forceReload$ 的事件,对其每一个值都切换到一个新的通知流。重要的是,我们需要取消之前的流的订阅。听起来耳熟吗?好像我们需要 switchMap ,是不是?

让我们动手实践吧!

import { Observable } from 'rxjs/Observable';

import { Subject } from 'rxjs/Subject';

import { merge } from 'rxjs/observable/merge';

import { take, switchMap, mergeMap, skip, mapTo } from 'rxjs/operators';

 

@Component({

  ...

})

export class JokeListComponent implements OnInit {

  showNotification$: Observable;

  update$ = new Subject();

  forceReload$ = new Subject();

  ...

 

  ngOnInit() {

    ...

    const reload$ = this.forceReload$.pipe(switchMap(() => this.getNotifications()));

    const initialNotifications$ = this.getNotifications();

    const show$ = merge(initialNotifications$, reload$).pipe(mapTo(true));

    const hide$ = this.update$.pipe(mapTo(false));

    this.showNotification$ = merge(show$, hide$);

  }

 

  getNotifications() {

    return this.jokeService.jokes.pipe(skip(1));

  }

  ...

}

好了。只要 forceReload$ 一发出值,我们就取消订阅之前的 Observable ,切换到一个新的通知流。注意我们有一段代码需要两遍,也就是 this.jokeService.jokes.pipe(skip(1)) 。为了避免重复代码,我们创建一个函数 getNotifications() ,返回跳过第一个值的笑话的流。最后,我们将 initialNotifications$ 和 reload$ 合并到名为 show$ 的流。这个流负责在屏幕上显示通知。这里并不需要取消订阅 initialNotifications$ ,因为这个流在下一次订阅重新创建缓存之前就已经结束了。其余部分保持不变。

呼,我们完成了。现在花掉事件看看我们实现了什么。

在弹子图中可以看到, initialNotifications$ 对显示通知非常重要。如果我们缺失了这个流,就只能在强制更新缓存之后才会看到通知。也就是说,我们按需请求新的数据时,必须不断切换到一个新的通知流,因为之前的(旧的) Observable 对象已经完成,不会再发出值。

大功告成!我们使用 RxJS 和 Angular 提供的工具创建并实现了一个复杂的缓存机制。回顾一下,我们的服务暴露了一个笑话列表的流。底层的 HTTP 请求每 10 秒更新缓存。为了改进用户体验,我们显示一个通知,以便用户强制更新 UI。在这之上,我们还实现了一个允许用户按需请求新的数据的方法。

太棒了!这就是最终的解决方案。花几分钟检查一下代码,尝试下不同的场景,看看一切是否正常。

前景

如果你想做点作业,或者再多思考下,下面有几个可以改进的地方:

添加错误处理

重构组建中的逻辑到一个服务以便重用

特别感谢

特别感谢 Kwinten Pisman 帮助完成代码。同样,感谢 Ben Lesh 和 Brian Troncone 提供的宝贵反馈以及之处一些改进点。另外,感谢 Christoph Burgdorf 帮助复查文章和代码。

来自:https://devbean.net/2018/06/advanced-caching-with-rxjs/


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java并发编程-volatile(java并发编程的艺术 pdf)
下一篇:编写高性能 Java 代码的最佳实践
相关文章

 发表评论

暂时没有评论,来抢沙发吧~