JS Journey into outer space - Day 4

This article was written as part of a knowledge sharing program I created for front-end developers. It builds up to creating reusable abstractions by studying async transducers.

Day 4

Now we have modified transduce to handle async it's time to figure out how it can handle observables.

Kickoff 🏈

First we need to pass the onNext function which kicks off the callback chain. In RxJs this is called subscribe instead, so let's rename the parameter and the callbacks to be more in line with RxJs:

function transduce(input, fn, subscribe, onError, onComplete, init) {
  subscribe(input, {
    next: (cur) => {
      init = fn(init, cur);
    },
    error: (err) => {
      onError(init, err);
    },
    complete: () => {
      onComplete(init);
    }
  });
  return init;
};

🦉 Observables are considered a push-style of dealing with many values, while generators are pull-based. In RxJS there are ways to handle this "stream" of values in a more sophisticated way, as they're always coming in. Even in async generators the code needs to get and await the next value, evaluating it when it's resolved. On the flipside, it means that the consumer is in charge: you don't need to think about functionally transforming streams, you can get a single value on demand and operate on it, just like with any imperative construct. What is called "backpressure" is handled in a natural way, where with observables you would need to buffer or even drop values altogether. You can convert between generators and observables, but they generally exist in different paradigms, so the choice is up to you I guess 🙃

The subscribe function for Observable is simply a way to dispatch the subscribe method on the object (perhaps it would have been nicer if RxJs would already expose this function, but it doesn't seem so). Note that this function returns void. At some point we might need to come back to the return value, because we may have to unsubscribe from the observable.

function observableSubscribe<T>(o: Observable<T>, s: Observer<T>) {
  o.subscribe(s);
}

Subject

Just like the Promise case needed a Deferred, the Observable case needs an initial value that allows for "updates from the outside". In RxJS this typically is a Subject, which exposes methods next, error, parallel to resolve and reject. It also has a complete method, because since an observable "resolve" many values it can complete at any time. When a Subject is passed as initial value, the handlers can simply dispatch on it:

function observableConcat<T>(a: Observable<T>, c: T) {
  a.next(c);
  return a;
}
function observableOnError<T>(a: Observable<T>, error: any) {
  a.error(error);
}
function observableOnComplete<T>(a: Observable<T>) {
  a.complete();
}

Finally we can transduce observables.

const xform = doubleEvens(observableConcat);
const result = transduceObservable(
  from([1, 2, 3, 4, 5, 6]),
  xform,
  observableSubscribe,
  observableOnError,
  observableOnComplete,
  new Subject()
);
result.subscribe(console.log); // 4 8 12

Ready for takeoff 🧑‍🚀

Now that most of our usecases are covered (though not yet all operations) the time has come to make the transducer function handle them all. It's possible to get rid of all the specific arguments related to different types, but before that let's look at the common characteristics.

First it seems that all these type-specific functions are really different, but just like the Iterable interface is available JS (or, rather, TS), the fp-ts library has a collection of type classes ready for use. At first the names look seriously alien: Monad, Functor, Semigroup... These names are coming from category theory, a field in math, and not from any practical application (also, not all modules in fp-ts expose type classes).

However, this road was created as a gentle introduction into higher and higher levels of abstraction, by focusing on what is concretely usable. So let's dive in and look at the Monoid type class.

🦉 Semigroup defines the behaviour of types that allow for concatenation, in whatever form that takes. JS types like arrays, strings and numbers all display some form of concatenation, so these types can be considered to belong to this category. Monoid extends this behaviour with an "empty" or initial value of a type. The empty array, the empty string, and the number zero were chosen respectively. See the relevant modules in the fp-ts documentation.

Not all implementations in fp-ts are similar (e.g. there is no Monoid for Array<unknown>), so this repo re-exposes Monoid to be readily used in the transducer functions. We already implemented Monoid behaviour for promises and observables, but we'll expose it using correct type.

function transduceArray<T>(input: T[], fn) {
  return transduce(
    input,
    fn(array.getMonoid<T>().concat),
    iteratorSubscribe,
    noop,
    noop,
    array.getMonoid<T>().empty
  );
}

function transduceString(input: string, fn) {
  return transduce(
    input,
    fn(string.getMonoid().concat),
    iteratorSubscribe,
    noop,
    noop,
    string.getMonoid().empty
  );
}

function transducePromise<T>(input: Promise<T>, fn) {
  return transduce(
    input,
    fn(promise.getMonoid<T>().concat),
    promiseSubscribe,
    promiseOnError,
    noop,
    promise.getMonoid<T>().empty
  );
}


function transduceObservable<T>(input: Observable<T>, fn) {
  return transduce(
    input,
    fn(observable.getMonoid<T>().concat),
    observableSubscribe,
    observableOnError,
    observableOnComplete,
    observable.getMonoid<T>().empty
  );
}

All that is needed now is a type class that expresses the subscribe, onError and onComplete behaviour (if any) and we can have a stab at creating the input type for transduce:

// required typeclasses for the types implementing `transduce`
interface Semigroup<A> {
  readonly concat: (x: A, y: A) => A
}

interface Monoid<A> extends Semigroup<A> {
  readonly empty: A
}

interface Observer<T> {
  next: (value: T) => void;
  error: (err: any) => void;
  complete: () => void;
}

interface Subscribable<A> {
  subscribe: <T>(a: A, o: Observer<T>) => void;
}

interface Failable<A> {
  onError: (a: A, err: unknown) => void;
}

interface Completable<A> {
  onComplete: (a: A) => void;
}

interface Transducable<A, B = A, T = unknown> {
  transduce(
    input: A,
    fn: (a: A, c: T) => A,
    subscribe: (a: A, o: Observer<T>) => void,
    onError: (a: A, err: unknown) => void,
    onComplete: (a: A) => void,
    init: B
  );
}

There are some things to iron out, but Rome wasn't built in four days.

Comments

Popular posts from this blog

Abandoning hope... and XForms

JS Journey into outer space - Day 5