Abstraction in F[_]: Lift Implementation Details Outside the Class

We now know that we can lift implementation details outside the class to gain some freedom in the client code; we have gained abstraction through decoupling. Why not apply a similar abstraction on the rest of the code.

We had:

trait Pipeline[F[_], A, B]{
  final def apply(uri: URI)(implicit F: Functor[F]): F[Unit] = {
    val in = read(uri)
    val computed = F.map(in)(computation)
    F.map(computed)(write)
  }
  def read(uri: URI): F[A]
  def computation(in: A): B
  def write(in: B): Unit
}

And we can abstract it into:

trait Read[F[_], A] extends Function1[URI, F[A]]
trait Computation[A, B] extends Function1[A, B]
trait Write[B] extends Function1[B, Unit]

trait Pipeline[F[_], A, B]{
  final def apply(uri: URI)(implicit
    F: Functor[F],
    read: Read[F, A],
    computation: Computation[A, B],
    write: Write[B]): F[Unit] = {
    val in = read(uri)
    val computed = F.map(in)(computation)
    F.map(computed)(write)
  }
}

While we're at it, let's abstract out the apply method as well.

sealed trait Pipeline[F[_], A, B]{
  def apply(uri: URI): F[Unit]
}
object Pipeline{
  final def apply[F[_]: Functor, A, B](implicit
    read: Read[F, A],
    computation: Computation[A, B],
    write: Write[B]) = {
    val F: Functor[F] = implicitly
    new Pipeline[F, A, B]{
      override def apply(uri: URI): F[Unit] = {
        val in = read(uri)
        val computed = F.map(in)(computation)
        F.map(computed)(write)
      }
    }
  }
}

Now our trait is a simple expression of inputs to outputs. But why would we ever perform this kind of abstraction? It seems like we are complicating the problem, not making it simpler.

 

The benefits here are more subtle and only appear in certain use cases. Say you have two pipelines:

val p1: Pipeline[Stream, Int, String] = ???
val p2: Pipeline[Stream, String, Array[Char]] = ???

And you wanted to combine them into a single

val pipeline: Pipeline[Stream, Int, Array[Char]] = ???

Taking two Pipeline instances and composing them is not a readily understood idea. However, taking two Function1 instances and composing them is very well understood. Notice we brought the functions read, compute and write outside the class as simple Function1 instances. Abstracting the Pipeline functions outside the trait provides the developer who is writing the client code with a clear and well understood method for composing multiple Pipelines.

This is still an incomplete implementation. We can see a path forward for composing any number of Pipelines whose computations can be composed but, how do we compose Pipelines who accept different inputs?

A simple switching mechanism

Say we have three Pipeline instances which require separate inputs.

val p1: Pipeline[Stream, ...] = ...
val p2: Pipeline[Stream, ...] = ...
val p3: Pipeline[Stream, ...] = ...

Our application would need to accept a URI and choose which pipeline (if any) should run it.

def perform(uri: URI): Stream[Unit] = {
if(uri.toString.contains("p1")) p1(uri)
else if(uri.toString.contains("p2")) p2(uri)
else if(uri.toString.contains("p3")) p3(uri)
else Stream()
}

This is a lot of boilerplate. Especially when you consider the number of Pipelines (for any successful business) is expected to increase. Let's unpack what we have and see if we can't abstract it into our Pipeline definition.

  1. Uniform Input URI is the input to ALL Pipeline instances
  2. Guards checking a URI against some
  3. Constant value defining a Pipeline for use in a Guard
  4. Default case in case the input matches no Pipeline

Our uniform input means we don't have to worry about which Pipeline can take what Types of values. This is already abstract enough.

We'll build a typeclass to model Guards and Constants associated with each pipeline.

trait Guard[-T]{
def name: String
}
sealed trait Pipeline[-T, A, B]{
def apply(uri: URI): F[Unit]
}
object Pipeline{
final def apply[T: Guard, F[_]: Functor, A, B](implicit
read: Read[F, A],
computation: Computation[A, B],
write: Write[B]): Default[T, F, A, B] = {
val G: Guard[T] = implicitly
val F: Functor[F] = implicitly
new Pipeline[T, A, B]{
override def apply(uri: URI): F[Unit] = ???
  }
}
}

We have an issue here. The last else case of our function returns an empty Stream. In the Pipeline object we don't know what our effect type is. We cannot return an empty version thereof. This problem takes me back to a talk given by Runar Bjarnason last year wherein he describes how when we liberate our types, we constrain our implementation and when we constrain our types we liberate our implementation. We have liberated all of our types here (except URI) leaving ourselves no room to implement what we need. So, we need to constrain a type that we may regain our ability to implement our function. Let's constrain our output type.

 
trait Guard[-T]{
  def name: String
}
sealed trait Pipeline[-T, A, B]{
type Out
  def apply(uri: URI): Out
}
object Pipeline{
  final def apply[T: Guard, F[_]: Functor, A, B](implicit
    read: Read[F, A],
    computation: Computation[A, B],
    write: Write[B]): Default[T, F, A, B] = {
    val G: Guard[T] = implicitly
    val F: Functor[F] = implicitly
    new Pipeline[T, A, B]{
  type Out = Either[Unit, F[Unit]]
      override def apply(uri: URI): Out = {
        val from = uri.toString
        if(from.contains(G.name)) Right{
          val in = read(uri)
          val computed = F.map(in)(computation)
          F.map(computed)(write)
        } else Left(())
      }
    }
  }
}

So our client code becomes

trait P1
trait P2
trait P3
implicit def guardP1 = new Guard[P1]{
  final override def name: String = "p1"
}
implicit def guardP2 = new Guard[P2]{
  final override def name: String = "p2"
}
implicit def guardP3 = new Guard[P3]{
  final override def name: String = "p3"
}
implicit def p1: Pipeline[P1, Stream, ...]
implicit def p2: Pipeline[P2, Stream, ...]
implicit def p3: Pipeline[P3, Stream, ...]
def perform(uri: URI): Either[Either[Either[Unit, Stream[Unit]], Stream[Unit]], Stream[Unit]] = {
  p1(uri).fold(
    _ => Left(p2(uri).fold(
      _ => Left(p3(uri).fold(
        _ => Left(()),
        a => Right(a)
      )),
      a => Right(a)
    )),
    a => Right(a)
  )
}

This has made things much worse. There is even more boiler plate and the nesting will become unreasonable in short order. But we have something we can easily reason about at the type level:

  • Given a known set of Pipeline instances
  • Created a computation which is at most 1 Pipeline
  • Resulting in a nested Data Structure

These characteristics indicate we can take an inductive approach to building our Pipeline library. Enter Shapeless.