ReactiveX es una API para la programación asíncrona con Observables. Su implementación en Javascript es RxJS y en Angular (no sé en otros frameworks), se usa y se usa mucho, no puedes pasarlo por alto. Este post (al que luego volveré a hacer mención) empieza con un párrafo bastante lapidario "Like it or not, rxjs is a critical component of modern Angular development. Although it is perfectly possible to use Angular 2+ without using observables, you lose out on an enormous amount of functionality. The reactive pattern is extremely powerful, and once you get over the, admittedly rather high, learning curve the grass is definitely greener on the other side.". Pues así me siento yo, en mi opinión, Angular es un framework con una curva de aprendizaje medio-alta, si a eso le sumamos Redux, en su sabor Angular vía NgRx, la cosa se complica un poco/bastante más, pero todo tiene un denominador común... RxJS. De hecho, el lema de NgRx es "Reactive State for Angular", y Reactive es sinónimo de RxJS. Queda claro que RxJS debe ser importante.
En este sentido, hay un concepto de RxJS que me parece especialmente importante y son los high-order Observables. Normalmente, trabajamos con Observables que emiten valores de tipos básicos (strings, numbers, tipos de usuario, etc.), a estos se les llama first-order Observables, pero si un Observable emite, a su vez, Observables, estamos hablando de high-order Observables (es igual que cuando tenemos una función que recibe o devuelve otra función y hablamos de high-order functions, de ahí habrán sacado el nombre, digo yo).
Llegar a encontrarse en el código high-order Observables no es excepcional, bastaría un ejemplo sencillo como el siguiente, donde en la suscripción recibimos un Observable
import { Observable, of } from "rxjs"; import { map } from "rxjs/operators"; of(1, 2, 3) .pipe( map<number, Observable<number>>((n: number) => of(n * n)) ) .subscribe((n: Observable<number>) => console.log(n)); // Observable { _isScalar: true, _subscribe: [Function], value: 1 } // Observable { _isScalar: true, _subscribe: [Function], value: 4 } // Observable { _isScalar: true, _subscribe: [Function], value: 9 }
Podemos resolverlo creando una suscripción anidada, pero es un anti-pattern. Lo es porque perdemos el control de cuando cancelar la suscripción anidada y además recuerda (sospechosamente) al famoso call-back hell. Si te encuentras dos susbcribe anidados, tienes un problema. Y no lo digo yo, lo dice Deborah Kurata, que sabe bastante más que yo.
import { Observable, of } from "rxjs"; import { map } from "rxjs/operators"; of(1, 2, 3) .pipe( map<number, Observable<number>>((n: number) => of(n * n)) ) .subscribe((n: Observable<number>) => n.subscribe((i: number) => { console.log(i); }) ); // 1 // 4 // 9
Y ¡ojo!, no vale hace trampas, aunque no tengamos dos subscribe juntos, hay otras formas más enrevesadas de tener nested suscriptions.
import { Observable, of } from "rxjs"; import { map, tap } from "rxjs/operators"; of(1, 2, 3) .pipe( map<number, Observable<number>>((n: number) => of(n * n)), tap<Observable<number>>((o: Observable<number>) => { o.subscribe((i: number) => { console.log(`inner ${i}`); }); }) ) .subscribe((n: Observable<number>) => console.log(`outer ${n}`)); // inner 1 // outer [object Object] // inner 4 // outer [object Object] // inner 9 // outer [object Object]
He aprovechado este último ejemplo para introducir dos nuevas palabras que es importante tener en cuenta, source u outer Observable e inner Observable. El Observable al que nos suscribimos se llama outer Observable y, a los Observables que emite (por eso un high-order Observable) se les llama inner Observable.
También quiero aprovechar a hacer el disclaimer, de que en este post seré muy verbose con la firma de los métodos porque es una manía que tengo cuando estoy aprendiendo, es decir, lo anterior es equivalente a este otro código:
of(1, 2, 3) .pipe( map(n => of(n * n)), tap(o => { o.subscribe((i: number) => { console.log(`inner ${i}`); }); }) ) .subscribe(n => console.log(`outer ${n}`));
Volviendo al problema original, ¿cómo trabajar entonces con high-order Observables? Pues hay que convertir un high-order Observable en un first-order Observable. Eso se hace con flattenig (aplastamiento, su traducción más o menos acertada al español). Estos operadores permitirán que consumamos los inner Observable como tipos básicos y, además, y esto es muy importante, gestionarán de forma automática la suscripción y cancelación al inner Observable.
Por ejemplo, con concatAll podemos resolver el problema inicial.
import { Observable, of } from "rxjs"; import { map, concatAll } from "rxjs/operators"; of(1, 2, 3) .pipe( map<number, Observable<number>>((n: number) => of(n * n)), concatAll<number>(), ) .subscribe((n: number) => console.log(n)); // 1 // 4 // 9
Y podemos hacerlo un poco mejor, si usamos el operador concatMap, que es la suma de map y concatAll (al igual que sucede por ejemplo en vanilla Javascript con map y flat, que se combinan en flatMap)
import { Observable, of } from "rxjs"; import { map, concatAll, concatMap } from "rxjs/operators"; of(1, 2, 3) .pipe( concatMap<number, number>((n: number) => of<number>(n * n)) ) .subscribe((n: number) => console.log(n)); // 1 // 4 // 9
Amigo de concatMap, tenemos también a switchMap (un clásico), mergeMap y exhaustMap.
Lo más importante de entender es que todos estos operadores responden a la misma pregunta ¿Qué hacer si el outer Observable vuelve a emitir y el inner Observable todavía está trabajando? Es decir, ¿Qué hacer si se solapan emisiones del outer Observable con el inner Observable? Si pasa el suficiente tiempo entre emisiones del outer Observable como para que la suscripción y cancelación del inner Observable ya haya acabado, podemos poner lo que queramos, da igual, se va a comportar de la misma forma. Sin embargo, si el inner Observable todavía esta trabajando ¿qué hacer con el trabajo actual del inner? Aquí es donde el post que mencionaba al principio http://alanpryorjr.com/2019-05-15-rxjs-flattening-operators/ me parece genial porque (sin código, importante y felicito por ello al autor) explica con una analogía de jefe-empleado como se comportará para operador de los mencionados en el hipotético caso de que un jefe (outer Observable) manda tareas (emite) a un empleado (flatennig operator) que todavía no ha acabado la anterior (inner Observable).
Si lo llevamos a un escenario más concreto, como llamar a una API si el usuario hace click en un botón (y asumiendo no hemos tenido a bien, deshabilitar el botón después de un click, que sería lo suyo), tenemos lo siguiente:
- switchMap. Cancela petición en curso y vuelve a llamar a la API.
- concatMap. Cuando acabe la petición en curso, hará otra llamada. Es una cola. Ademas, se respetará el orden y no llamada hasta no acabar con la anterior (esto es importante porque queremos garantizar que en el back se procesen en orden, que se hagan las peticiones desde cliente en orden no garantiza que en el back se procesen en el mismo orden, por eso se espera a que termine una petición para lanzar la siguiente).
- mergeMap. A la vez que la petición en curso, se lanzará una nueva llamada, en paralelo.
- exhaustMap. No hará nada si hay una petición en curso.
En este ejemplo https://stackblitz.com/edit/angular-nkgfyr está recogido lo anterior, y usando https://www.mocky.io/ podemos simular un delay para forzar a que el inner Observable esté trabajando cuando volvamos a hacer click en el botón. Es muy importante tener abierta la pestaña network de las developer tools para ver como se cancelan las peticiones en curso en función del operador elegido. En el ejemplo, se usan switchMapTo, concatMapTo y mergeMapTo, que son iguales a sus versiones sin "To", sólo que no necesitan un parámetro de entrada.
Llegando al final y, puesto que hemos cancelado peticiones, ¿Qué podemos hacer en el back para aprovecharnos de este comportamiento? Pues usar CancellationToken y propagarlo en todos los métodos asíncronos que lo permitan (por ejemplo, EF, Dapper, MediatR, etc.). Porque es muy bonito que el cliente cancele una petición (le honra), pero si en back seguimos procesando las peticiones, sólo uno estará haciendo lo correcto, el otro seguirá sin percatarse de que está trabajando para nada y que ya a nadie le importa el resultado (triste pero cierto).
En ASP.NET Core (y haciendo una API identica a la que hemos consumido anteriormente), simplemente añadiendo el parámetro CancellationToken al método de acción y pasándoselo a Task.Delay hará la magia de lanzar una excepción del tipo System.Threading.Tasks.TaskCanceledException si el cliente cancela la petición.
[HttpGet] public async Task<ActionResult<IEnumerable<User>>> Get(CancellationToken cancellationToken, int delay = 0) { await Task.Delay(delay, cancellationToken); return new[] { new User() {Id = 1, Name = "Sergio", Email = "panicoenlaxbox@gmail.com"}, new User() {Id = 2, Name = "Carmen", Email = "panicoenel20@gmail.com"} }; }
Un saludo!
Este comentario ha sido eliminado por el autor.
ResponderEliminar