Abstraction in F[_]: Lift Implementation Details Outside the Class

We now know that we can lift implementation details outside the class to gain some freedom in the client code; we have gained abstraction through decoupling. Why not apply a similar abstraction on the rest of the code.

trait Pipeline[F[_], A, B]{
final def apply(uri: URI)(implicit F: Functor[F]): F[Unit] = {
val in = read(uri)
val computed = F.map(in)(computation)
F.map(computed)(write)
}
def read(uri: URI): F[A]
def computation(in: A): B
def write(in: B): Unit
}

And we can abstract it into:

trait Read[F[_], A] extends Function1[URI, F[A]]
trait Computation[A, B] extends Function1[A, B]
trait Write[B] extends Function1[B, Unit]

trait Pipeline[F[_], A, B]{
final def apply(uri: URI)(implicit
F: Functor[F],
computation: Computation[A, B],
write: Write[B]): F[Unit] = {
val in = read(uri)
val computed = F.map(in)(computation)
F.map(computed)(write)
}
}

While we're at it, let's abstract out the apply method as well.

sealed trait Pipeline[F[_], A, B]{
def apply(uri: URI): F[Unit]
}
object Pipeline{
final def apply[F[_]: Functor, A, B](implicit
computation: Computation[A, B],
write: Write[B]) = {
val F: Functor[F] = implicitly
new Pipeline[F, A, B]{
override def apply(uri: URI): F[Unit] = {
val in = read(uri)
val computed = F.map(in)(computation)
F.map(computed)(write)
}
}
}
}

Now our trait is a simple expression of inputs to outputs. But why would we ever perform this kind of abstraction? It seems like we are complicating the problem, not making it simpler.

The benefits here are more subtle and only appear in certain use cases. Say you have two pipelines:

val p1: Pipeline[Stream, Int, String] = ???
val p2: Pipeline[Stream, String, Array[Char]] = ???

And you wanted to combine them into a single

val pipeline: Pipeline[Stream, Int, Array[Char]] = ???

Taking two Pipeline instances and composing them is not a readily understood idea. However, taking two Function1 instances and composing them is very well understood. Notice we brought the functions read, compute and write outside the class as simple Function1 instances. Abstracting the Pipeline functions outside the trait provides the developer who is writing the client code with a clear and well understood method for composing multiple Pipelines.

This is still an incomplete implementation. We can see a path forward for composing any number of Pipelines whose computations can be composed but, how do we compose Pipelines who accept different inputs?

A simple switching mechanism

Say we have three Pipeline instances which require separate inputs.

val p1: Pipeline[Stream, ...] = ...
val p2: Pipeline[Stream, ...] = ...
val p3: Pipeline[Stream, ...] = ...

Our application would need to accept a URI and choose which pipeline (if any) should run it.

def perform(uri: URI): Stream[Unit] = {
if(uri.toString.contains("p1")) p1(uri)
else if(uri.toString.contains("p2")) p2(uri)
else if(uri.toString.contains("p3")) p3(uri)
else Stream()
}

This is a lot of boilerplate. Especially when you consider the number of Pipelines (for any successful business) is expected to increase. Let's unpack what we have and see if we can't abstract it into our Pipeline definition.

1. Uniform Input URI is the input to ALL Pipeline instances
2. Guards checking a URI against some
3. Constant value defining a Pipeline for use in a Guard
4. Default case in case the input matches no Pipeline

Our uniform input means we don't have to worry about which Pipeline can take what Types of values. This is already abstract enough.

We'll build a typeclass to model Guards and Constants associated with each pipeline.

trait Guard[-T]{
def name: String
}
sealed trait Pipeline[-T, A, B]{
def apply(uri: URI): F[Unit]
}
object Pipeline{
final def apply[T: Guard, F[_]: Functor, A, B](implicit
computation: Computation[A, B],
write: Write[B]): Default[T, F, A, B] = {
val G: Guard[T] = implicitly
val F: Functor[F] = implicitly
new Pipeline[T, A, B]{
override def apply(uri: URI): F[Unit] = ???
}
}
}

We have an issue here. The last else case of our function returns an empty Stream. In the Pipeline object we don't know what our effect type is. We cannot return an empty version thereof. This problem takes me back to a talk given by Runar Bjarnason last year wherein he describes how when we liberate our types, we constrain our implementation and when we constrain our types we liberate our implementation. We have liberated all of our types here (except URI) leaving ourselves no room to implement what we need. So, we need to constrain a type that we may regain our ability to implement our function. Let's constrain our output type.

trait Guard[-T]{
def name: String
}
sealed trait Pipeline[-T, A, B]{
type Out
def apply(uri: URI): Out
}
object Pipeline{
final def apply[T: Guard, F[_]: Functor, A, B](implicit
computation: Computation[A, B],
write: Write[B]): Default[T, F, A, B] = {
val G: Guard[T] = implicitly
val F: Functor[F] = implicitly
new Pipeline[T, A, B]{
type Out = Either[Unit, F[Unit]]
override def apply(uri: URI): Out = {
val from = uri.toString
if(from.contains(G.name)) Right{
val in = read(uri)
val computed = F.map(in)(computation)
F.map(computed)(write)
} else Left(())
}
}
}
}

So our client code becomes

trait P1
trait P2
trait P3
implicit def guardP1 = new Guard[P1]{
final override def name: String = "p1"
}
implicit def guardP2 = new Guard[P2]{
final override def name: String = "p2"
}
implicit def guardP3 = new Guard[P3]{
final override def name: String = "p3"
}
implicit def p1: Pipeline[P1, Stream, ...]
implicit def p2: Pipeline[P2, Stream, ...]
implicit def p3: Pipeline[P3, Stream, ...]
def perform(uri: URI): Either[Either[Either[Unit, Stream[Unit]], Stream[Unit]], Stream[Unit]] = {
p1(uri).fold(
_ => Left(p2(uri).fold(
_ => Left(p3(uri).fold(
_ => Left(()),
a => Right(a)
)),
a => Right(a)
)),
a => Right(a)
)
}

This has made things much worse. There is even more boiler plate and the nesting will become unreasonable in short order. But we have something we can easily reason about at the type level:

• Given a known set of Pipeline instances
• Created a computation which is at most 1 Pipeline
• Resulting in a nested Data Structure

These characteristics indicate we can take an inductive approach to building our Pipeline library. Enter Shapeless.

Abstraction in F[_]: Abstract Your Functions

We have a reasonably abstract pipeline in

trait Pipeline[F[_], A, B]{
final def apply(uri: URI): F[Unit] =
def read(uri: URI): F[A]
def computation(in: F[A]): F[B]
def write(in: F[B]): F[Unit]
}

Recognizing Higher-Kinded Duplication

Taking a close look at the trait, we see the computation and write functions are the same aside from their type variables. In fact, if we rename them to have the same name, the compiler complains

scala> :paste
// Entering paste mode (ctrl-D to finish)
trait Pipeline[F[_], A, B]{
def perform(in: F[A]): F[B]
def perform(in: F[B]): F[Unit]
}
// Exiting paste mode, now interpreting.
<console>:9: error: double definition:
method perform:(in: F[B])F[Unit] and
method perform:(in: F[A])F[B] at line 8
have same type after erasure: (in: Object)Object
def perform(in: F[B]): F[Unit]

Since these are the same, we can build an abstraction to simplify our API even further.

trait Pipeline[F[_], A, B]{
final def apply(uri: URI): F[Unit] = {
val in = read(uri)
val computed = convert(in)(computation)
convert(computed)(write)
}
def read(uri: URI): F[A]
def computation(in: A): B
def write(in: B): Unit
def convert[U, V](in: F[U], f: U => V): F[V]
}

We've removed the need for the developer to understand the effect type in order to reason about a computation or write step. Now, let's focus on this new function

def convert[U, V](in: F[U], f: U => V): F[V]

This is super abstract. Like so abstract it is meaningless without context. I am reminded of this video in which Rob Norris explains how he continued to abstract his database code until some mathematical principles sort of arose from the work. In this talk, he points out that anytime he writes something sufficiently abstract he checks a library for it, as he probably has not himself discovered some new basic principle of mathematics. We do the same here.

Inside the cats library we find the following def within the Functor class

def map[A, B](fa: F[A])(f: A => B): F[B]

This is the same as if we wrote our convert function as curried rather than multiple argument. We replace our custom function with one from a library; the chance is greater that a developer is well-informed on cats than our internal library. (post on implicits and type classes)

trait Pipeline[F[_], A, B]{
final def apply(uri: URI)(implicit F: Functor[F]): F[Unit] = {
val in = read(uri)
val computed = F.map(in)(computation)
F.map(computed)(write)
}
def read(uri: URI): F[A]
def computation(in: A): B
def write(in: B): Unit
}

Here we were able to replace an internal (thus constrained) implementation detail with an external (thus liberated) argument. In other words, we have lifted an implementation detail outside our class giving the client code freedom to use the same instantiated Pipeline in a multitude of contexts.

Abstraction in F[_]: Abstract your Types

A Pipeline of Stream from BigInt to String

Say we have a  data pipeline:

trait Pipeline{
final def apply(uri: URI): Stream[Unit] =
def read(uri: URI): Stream[BigInt]
def computation(in: Stream[BigInt]): Stream[String]
def write(in: Stream[String]): Stream[Unit]
}

The limitations here are many. The most important limitation is this only works for data pipelines your team can model as streams of an input to a BigInt to a String to an output. This is not very useful. The first step is abstracting over your computation types.

A Pipeline of Stream

Removing the constraint on BigInt and String requires type parameters on our Pipeline trait:

trait Pipeline[A, B]{
final def apply(uri: URI): Stream[Unit] =
def read(uri: URI): Stream[A]
def computation(in: Stream[A]): Stream[B]
def write(in: Stream[B]): Stream[Unit]
}

We have gained a bit of freedom in implementation. We can now write Pipelines that can be modeled as streams of an input to a A to a B to an output given any A and B. Now instead of being constrained to BigInt and String, we have gained some liberty through our abstraction.

Still, we are constrained to the scala Stream type. This, too, is a nuisance what if we require Pipelines that effect through fs2 Stream or spark Dataset or any other suitable effect? Similar to how we abstracted away from BigInt and String by making them type parameters A and B, we can do the same with our Stream.

A Pipeline

Using a higher-kinded type parameter, we can abstract over any effect assuming the effect has a single type parameter.

trait Pipeline[F[_], A, B]{
final def apply(uri: URI): F[Unit] =
def read(uri: URI): F[A]
def computation(in: F[A]): F[B]
def write(in: F[B]): F[Unit]
}

Now, we can make a data Pipeline using any such types! We can have our original Pipeline of Stream from BigInt to String

val pipeline: Pipeline[Stream, BigInt, String] = ???

We can have a Pipeline of fs2 Stream with some type construction:

type MyStream[A] = fs2.Stream[fs2.Task, A]
val pipeline: Pipeline[MyStream, BigInt, String] = ???

We can even do this with spark

val pipeline: Pipeline[Dataset, BigInt, String] = ???

Any Pipeline your team can model as a read effect a computation and a write effect can be defined with Pipeline defined this way.

Abstraction in F[_]

I gave a talk at typelevel today about abstraction data pipelines and some ways to ease the use of Spark in purely functional application. It seemed to have gone pretty well, here you will find the video, deck (pptx, pdf) and code

In the coming weeks I'll be posting the long-form version of the talk.