Abstraction in F[_]: Implicits and Induction

Now we have a set of Pipeline instances

trait P1; trait P2; trait P3
implicit def guardP1 = new Guard[P1]{
  final override def name: String = "p1"
}
implicit def guardP2 = new Guard[P2]{
  final override def name: String = "p2"
}
implicit def guardP3 = new Guard[P3]{
  final override def name: String = "p3"
}
implicit def p1: Pipeline[P1, Stream, ...]
implicit def p2: Pipeline[P2, Stream, ...]
implicit def p3: Pipeline[P3, Stream, ...]

And we need to construct a Pipeline which represents the choosing of on of these Pipeline instances. In psudocode:

val pipeline: Pipeline[P1 or P2 or P3, Stream, ...]

In the previous post, given three Pipeline instances, we produced an Either[Either[Either[...]]] from a URI. More abstractly given three items we produced a recursive structure with three levels.

Building Recursive Structures with Shapeless

The shapeless library provides machinery for composing these kinds of recursive applications.

A Brief Intro to Shapeless

Two of the more widely used structures in shapeless are HList and Coproduct.

HList is a Heterogenous List. It is a List that retains type information for each element rather than for all elements. For example in the REPL:

scala> 1 :: "1" :: 1.0 :: Nil
res1: List[Any] = List(1, 1, 1.0)

scala> 1 :: "1" :: 1.0 :: HNil
res2: shapeless.::[Int,shapeless.::[String,shapeless.::[Double,shapeless.HNil]]] = 1 :: 1 :: 1.0 :: HNil

The List comes back with a type parameter of Any while the HList knows it has an Int, String and Double. An HList is a Product. It maintains that there exists all of its types.

A Coproduct is the Categorical Dual of a Product. It maintains that there exists any of its types. In the REPL:

scala> val t: T = Inl(1)
t: T = Inl(1)

scala> val t: T = Inr(Inl('1'))
t: T = Inr(Inl(1))

scala> val t: T = Inr(Inr(Inl(1.0)))
t: T = Inr(Inr(Inl(1.0)))

We can see these are both recursive data structures. Shapeless excels at these kinds of things in Scala.

Building a Recursive Either Inductively

Induction is performed by noting two things:

  1. A base case from which one may begin the process
  2. An inductive case from which one may continue the process

We can model this in Scala using functions and implicits.

What we have in our nested Either pipeline from the previous post is going from the inside (simplest) case outward is

  1. an empty Stream
  2. or a p3(uri)
  3. or a p2(uri)
  4. or a p1(uri)

One way to picture this is to say given a way to produce an empty Stream we can produce a p3(uri) or an empty Stream. Given a (p3(uri) or an empty Stream) we can produce a (p2(uri) or p3(uri) or an empty Stream). And finally, given an (p2(uri) or p3(uri) or an empty Stream) we can produce a (p1(uri) or p2(uri) or p3(uri) or an empty Stream).

Our base case is Stream() and our inductive step prepends the possibility of a Pipeline instance.

First, a preliminary type alias

object Pipeline{
  type Aux[T, A, B, O] = Pipeline[T, A, B]{type Out = O}
  ...
}

Adding the Aux keyword allows us to skip the type refinement each time we need to refer to the type member of our Pipeline trait. It is a nifty trick for "lifting" a type member into a type parameter to get around an annoyance of the language.

Generalizing our inductive Stream Pipeline, we have a base Pipeline instance:

implicit def PNil: Pipeline.Aux[CNil, CNil, CNil, Unit:+:CNil] = {
  new Pipeline[CNil, CNil, CNil]{
    type Out = Unit:+:CNil
    override def apply(uri: URI): Out = Inl(())
  }
}

This basically implies: Given nothing, we can get a meaningless Pipeline instance. This is just a stub or placeholder that we may begin the inductive process.

Our inductive Pipeline step builds upon a previous step prepending the possibility of a particular Pipeline:

implicit def inductivePipeline[TH, F[_], AH, BH,
  TT <: Coproduct, AT <: Coproduct, BT <: Coproduct, OT <: Coproduct
](implicit
  head: Pipeline.Aux[TH, AH, BH, Either[Unit, F[Unit]]],
  tail: Pipeline.Aux[TT, AT, BT, OT]
): Pipeline.Aux[TH:+:TT, AH:+:AT, BH:+:BT, F[Unit]:+:OT] = {
  new Pipeline[TH:+:TT, AH:+:AT, BH:+:BT]{
    final override type Out = F[Unit]:+:OT
    final override def apply(uri: URI): Out = {
      head(uri).fold(
        {_ =>
          Inr(tail(uri))
        },
        s => Inl(s)
      )
    }
  }
}

This basically says given two Pipeline instances the second of which has type parameters all of which are themselves Coproduct instances we can produce a Pipeline which has type parameters all of which are Coproducts whose left case is the first input Pipeline instance's type parameters. This requires unpacking.

Given two Pipeline instances

This is not too surprising: our head and tail.

The second of which has type parameters all of which are themselves Coproduct instances

Our tail is a Pipeline constructed of Coproducts. This tail is the first ingredient for our inductive steps. Recall the base case which is a Pipeline.Aux[CNil, CNil, CNil, Unit:+:CNil]. All of the type parameters here are indeed Coproduct instances. So the first step of our example can take the base case as its tail instance.

We can produce a Pipeline which has type parameters all of which are Coproducts

The return value is a Pipeline with Coproducts for type parameters. This is important because this implies each step of the inductive process can be used as the tail in another step of the inductive process.

Whose left case is the first input Pipeline instance's type parameters

The head becomes the left most case in the resulting Pipeline instance.

Using our Inductive Pipeline

Now that we've done all this work, it is time to reap the benefits!

implicit val pipeline1 = Pipeline[One, …]
implicit val pipeline2 = Pipeline[Two, …]
implicit val pipeline3 = Pipeline[Three, …]
val pipeline4: Pipeline.Aux[
  One :+: Two :+: Three :+: CNil, …,
  Stream[Unit] :+: Stream[Unit] :+: Stream[Unit] :+: Unit :+: CNil] = implicitly
pipeline4(uri)

This. Is. Awful...

The type annotations here are brutal. Of course, with more shapeless machinery we can overcome this but a much simpler solution arises when we look first at what we are trying to accomplish.

What we want

implicit val pipeline1 = Pipeline[One, …]
implicit val pipeline2 = Pipeline[Two, …]
implicit val pipeline3 = Pipeline[Three, …]
val pipeline4 = pipeline1 op pipeline2 op pipeline3 op PNil
pipeline4(uri)

If we can develop an operator which can stitch pipelines together, we are finished. No fancy macros. No fancy generics. No fancy anything. Just an operator.

We can accomplish this with an implicit AnyVal and an operator which ends with a ':' character

implicit class Ops[TT <: Coproduct,
  AT <: Coproduct, BT <: Coproduct, OT <: Coproduct
](
  val tail: Pipeline.Aux[TT, AT, BT, OT]
) extends AnyVal{
  def +:[TH, F[_], AH, BH](
    head: Pipeline.Aux[TH, AH, BH, Pipeline.Out[F]]
  ): Pipeline.Aux[TH:+:TT,AH:+:AT, BH:+:BT,F[Unit]:+:OT] =
    inductivePipeline[
      TH, F, AH, BH, TT, AT, BT, OT
    ](head, tail)
}

This says the same thing as the inductivePipeline function but it introduces a nice infix operator rather than requiring a lot of complicated machinery to get the compiler to "do it for us". Thus, we have our combined pipeline:

implicit val pipeline1 = Pipeline[One, …]
implicit val pipeline2 = Pipeline[Two, …]
implicit val pipeline3 = Pipeline[Three, …]
val pipeline4 = pipeline1 +: pipeline2 +: pipeline3 +: PNil
pipeline4(uri)

This has reasonable type annotations and more importantly is straightforward. The syntax is exactly like the syntax of Seq construction in the Collections library.

Now we have a scalable pipeline library. It is easy to construct pipelines and merge them. Also, rearranging the order of the pipelines is trivial. There are however, two things to consider.

1. PNil is a def rather than a val

I am totally lost here. For some reason, the typer cannot unify  at the implicit conversion call site when PNil is a val. This is easy enough to resolve but perplexing none-the-less.

2. This will not work with spark Dataset

Spark Dataset requires the type paramerers have Encoder context bounds. cats Functor does not have context bounds on its type parameters. There is an easy work around for this using cats Id and a custom Functor-like type class

trait BoundedFunctor[F[_], B[_]]{
  def map[U: B, V: B](fu: F[U])(f: U => V): F[V]
}
trait Functor[F[_]] extends BoundedFunctor[F, Id]

Then we can redefine our apply method as

def apply[T: Guard, F[_]: BoundedFunctor, A, B]...

The rest of the code takes on similar changes and we can use Dataset with our pipeline by creating an instance thus:

implicit val sparkFunctor = new BoundedFunctor[Dataset, Encoder]{
  override def map[U: Encoder, V: Encoder](
    fu: Dataset[U])(f: U => V): Dataset[V] = fu.map(f)
}