Abstraction in F[_]: Abstract your Types

A Pipeline of Stream from BigInt to String

Say we have a  data pipeline:

trait Pipeline{
  final def apply(uri: URI): Stream[Unit] =
    write(computation(read(uri)))
  def read(uri: URI): Stream[BigInt]
  def computation(in: Stream[BigInt]): Stream[String]
  def write(in: Stream[String]): Stream[Unit]
}

The limitations here are many. The most important limitation is this only works for data pipelines your team can model as streams of an input to a BigInt to a String to an output. This is not very useful. The first step is abstracting over your computation types.

A Pipeline of Stream

Removing the constraint on BigInt and String requires type parameters on our Pipeline trait:

trait Pipeline[A, B]{
  final def apply(uri: URI): Stream[Unit] =
    write(computation(read(uri)))
  def read(uri: URI): Stream[A]
  def computation(in: Stream[A]): Stream[B]
  def write(in: Stream[B]): Stream[Unit]
}

We have gained a bit of freedom in implementation. We can now write Pipelines that can be modeled as streams of an input to a A to a B to an output given any A and B. Now instead of being constrained to BigInt and String, we have gained some liberty through our abstraction.

Still, we are constrained to the scala Stream type. This, too, is a nuisance what if we require Pipelines that effect through fs2 Stream or spark Dataset or any other suitable effect? Similar to how we abstracted away from BigInt and String by making them type parameters A and B, we can do the same with our Stream.

A Pipeline

Using a higher-kinded type parameter, we can abstract over any effect assuming the effect has a single type parameter.

trait Pipeline[F[_], A, B]{
  final def apply(uri: URI): F[Unit] =
    write(computation(read(uri)))
  def read(uri: URI): F[A]
  def computation(in: F[A]): F[B]
  def write(in: F[B]): F[Unit]
}

Now, we can make a data Pipeline using any such types! We can have our original Pipeline of Stream from BigInt to String

val pipeline: Pipeline[Stream, BigInt, String] = ???

We can have a Pipeline of fs2 Stream with some type construction:

type MyStream[A] = fs2.Stream[fs2.Task, A]
val pipeline: Pipeline[MyStream, BigInt, String] = ???

We can even do this with spark

val pipeline: Pipeline[Dataset, BigInt, String] = ???

Any Pipeline your team can model as a read effect a computation and a write effect can be defined with Pipeline defined this way.