Monday, October 30, 2017

Making comparisons more functional

In Scala 2.7 we got Ordering, a type class that essentially made Ordered, the original Scala extension of Java's Comparable, somewhat obsolete. Ordering (and type classes in general) is great, of course, and a big improvement. But what a shame they didn't go all the way and have Ordering provide a much more functional way of comparing.

Let me explain what I mean. Take any Java/Scala class with at least two fields in it. Implementing the comparison function, whether in Scala or Java is typically going to look like this (from the source of java.time.LocalDate.java):


int compareTo0(LocalDate otherDate) {
    int cmp = (year - otherDate.year);
    if (cmp == 0) {
        cmp = (month - otherDate.month);
        if (cmp == 0) {
            cmp = (day - otherDate.day);
        }
    }
    return cmp;
}

Do you see how ugly (and non-functional) that code is? It's fine in Java, because Java code expects to be a mix of assignments, if clauses, loops, etc. But Scala code expects to be expression-oriented, not statement-oriented. And there's no reason not to do it better in Scala, because we can create a type which can represent either equality, or difference. To be fair, Java could do this too, but that ship sailed long ago.

So, how about this type?--Comparison--a trait which extends Function0[Option[Boolean]]:


sealed trait Comparison extends (() => Option[Boolean]) {
  override def toString(): String = apply.toString
  def toInt: Int = apply match {
    case Some(b) => if (b) -1 else 1;
    case _ => 0  }
  def orElse(c: => Comparison): Comparison = Comparison(apply.orElse(c()))
  def flip: Comparison = Comparison(for (v <- apply) yield !v)
}
case class Different(less: Boolean) extends Comparison {
  def apply: Option[Boolean] = Some(less)
}
case object Same extends Comparison {
  def apply: Option[Boolean] = None
}
object Comparison {
  val more = Different(false)
  val less = Different(true)
  def apply(x: Option[Boolean]): Comparison = x match {
    case Some(b) => Different(b);
    case _ => Same
  }
  def apply(x: Int): Comparison = x match {
    case 0 => Same;
    case _ => Comparison(Some(x < 0))
  }
}

Thus, invoking the apply method of Comparison can yield one of three different instances: Some(true), Some(false), and None. These are quite sufficient to represent the result of a comparison of two objects.

Now that we have a way of expressing the result of a comparison, we can create a comparer type (note that the names Comparable, Comparator, Ordered, and Ordering are already in use). Let's choose the name as Orderable. Of course, like Ordering, we would like it to be a type class (actually we'd like it to replace Ordering). So we need another trait (unsealed, this time) like the following:


trait Orderable[T] extends (((T, T)) => Comparison) {
  self =>
  def toOrdering: Ordering[T] = new Ordering[T]() {
    def compare(x: T, y: T): Int = self(x, y).toInt
  }
  def >(tt: (T, T)): Boolean = apply(tt.swap)().getOrElse(false)
  def <(tt: (T, T)): Boolean = apply(tt)().getOrElse(false)
  def ==(tt: (T, T)): Boolean = apply(tt)().isEmpty
  def >=(tt: (T, T)): Boolean = ! <(tt)
  def <=(tt: (T, T)): Boolean = ! >(tt)
  def !=(tt: (T, T)): Boolean = ! ==(tt)
  def compose(f: Comparison => Comparison): Orderable[T] = new Orderable[T]() {
    def apply(tt: (T, T)): Comparison = f(self(tt))
  }
  def orElse(o: Orderable[T]): Orderable[T] = new Orderable[T]() {
    def apply(tt: (T, T)): Comparison = self(tt).orElse(o(tt))
  }
  def invert: Orderable[T] = compose(_ flip)
}
object Orderable {
  implicit val intOrderable: Orderable[Int] = Ordering[Int]
  implicit val strOrderable: Orderable[String] = Ordering[String]
  implicit def convert[T](x: Ordering[T]): Orderable[T] = new Orderable[T] {
    def apply(tt: (T, T)) = Comparison(x.compare(tt._1, tt._2))
  }
}
Note that, as Ordering has implicit converters to/from Ordered, so Orderable has explicit/implicit converters to/from Ordering, although perhaps it would be better to have them to/from Ordered.

There is one other minor difference here. Since we always implement the comparison function (here, it is the apply method) on a pair of T elements, we might as well make the input a Tuple2 of T. We can always convert to untupled form when necessary.

Note also the compose method which allows us to create a new Orderable based on this Orderable and a function from Comparison to Comparison.

And, in the following, we can see how the whole mechanism allows us to create a lazy sorted object, which only does an actual sort when the result is required:

case class Sorted[T](ts: Seq[T])(implicit f: Orderable[T]) extends (() => Seq[T]) {
  implicit val ordering: Ordering[T] = f.toOrdering
  def sort(o: Orderable[T]): Sorted[T] = Sorted(ts)(f orElse o)
  def apply: Seq[T] = ts.sorted
  def async(implicit ec: ExecutionContext): Future[Seq[T]] = Future(apply)
  def parSort(implicit ec: ExecutionContext): Future[Seq[T]] = Sorted.mergeSort(ts)
}
object Sorted {
  def create[T: Ordering](ts: Seq[T]): Sorted[T] = Sorted(ts)(implicitly[Ordering[T]])
  def verify[T: Orderable](xs: Seq[T]): Boolean = xs.zip(xs.tail).forall(z => implicitly[Orderable[T]].<=(z._1,z._2))
  def parSort[T: Ordering](tst: (Seq[T], Seq[T]))(implicit ec: ExecutionContext): Future[Seq[T]] = map2(Future(tst._1.sorted), Future(tst._2.sorted))(merge)
  def mergeSort[T: Ordering](ts: Seq[T])(implicit ec: ExecutionContext): Future[Seq[T]] = parSort(ts splitAt (ts.length/2))
  def merge[T: Ordering](ts1: Seq[T], ts2: Seq[T]): Seq[T] = {
    val ordering = implicitly[Ordering[T]]
    @tailrec def inner(r: Seq[T], xs: Seq[T], ys: Seq[T]): Seq[T] = (xs, ys) match {
      case (_, Nil) => r ++ xs
      case (Nil, _) => r ++ ys
      case (x :: xs1, y :: ys1) =>
        if (ordering.lt(x, y)) inner(r :+ x, xs1, ys)
        else inner(r :+ y, xs, ys1)
    }
    inner(Nil, ts1, ts2)
  }
  def map2[T: Ordering](t1f: Future[Seq[T]], t2f: Future[Seq[T]])(f: (Seq[T], Seq[T]) => Seq[T])(implicit ec: ExecutionContext): Future[Seq[T]] = for {t1 <- t1f; t2 <- t2f} yield f(t1, t2)
}
If you're interested to see a specification file, or an App for this, you can see find them in LaScala. under the package com/phasmid/laScala/sort.

Wednesday, June 21, 2017

Some observations on foldLeft with reference to short-circuiting

Have you ever written code such as the following and expected it to quit accessing values of y after it hits the first true value?

    val y = Seq[Boolean](false, true, ..., false)
    val x = y.foldLeft[Boolean](false)(_ || _)

After all, the second parameter of || is call-by-name so, if it's not needed to determine the outcome of the value for x, then that should mean that the recursion of the foldLeft should stop, right?

But it doesn't in practice, and if you look at the definition of foldLeft, you can see why. There is no short-circuit defined.

I therefore created a method in the FP module of my LaScala library to be the short-circuit equivalent of foldLeft. It's called foldLeftShort and it looks like this:

  /**
    * This method performs a foldLeft with short-circuit logic
    *
    * @param xi an iterator of X objects
    * @param y0 the initial value of Y (e.g. 0 if Y is Int)
    * @param q  the quick-return function: if this evaluates to true, given the new y value, that value is immediately returned
    * @param f  the function which accumulates the result: as with foldLeft, it takes a Y and an X and yields a Y
    * @tparam X the underlying type of the iterator
    * @tparam Y the return type
    * @return the aggregate of the given iterator, according to the initial value of y0 and the accumulator function f.
    */
  def foldLeftShort[X, Y](xi: Iterator[X], y0: Y, q: Y => Boolean)(f: (Y, X) => Y): Y = {
    def inner(_y: Y, xo: => Option[X]): Y =
      xo match {
        case None => _y
        case Some(x) =>
          val y = f(_y, x)
          if (q(y)) y
          else
            inner(y, nextOption(xi))
      }

    inner(y0, nextOption(xi))
  }

  /**
    * Method to get the next element from an iterator, as an Option.
    *
    * @param xi the iterator.
    * @tparam X the underlying type.
    * @return Some(x) where x is the next element; or None where the iterator is exhausted.
    */
  def nextOption[X](xi: Iterator[X]): Option[X] = if (xi.hasNext) Some(xi.next) else None

As with foldLeft, the accumulating function is in its own parameter set. But the initial value is joined by the iterator itself and the short-circuit function in the first parameter set. It would be nice to be able to extend the Iterator type to include this.

The inner function isn't defined in the typical way, using the concatenation unapply method (::) because, for some reason that I don't quite understand, you cannot decompose an Iterator this way. Instead, we make use of another FP method: nextOption which returns the next element of the iterator as an Option (see above).

Note the short-circuit mechanism, which really does work. As soon as we have a value by applying f to the current accumulator and the newest value from the iterator, we return if f evaluates to true.

In order to use this as efficiently, as possible, we need to write code something like the following where in this example, we want to save doing a trivial lookup too many times. That's not worth the effort--but just suppose that the lookup was actually an expensive database query instead:

    val lookupTable = Map("a"->false, "b"->true)
    val boi = ns.view.map(x => lookupTable.get(x)).toIterator
    FP.foldLeftShort[Option[Boolean], Boolean](boi, false, b => b)(_ || _.getOrElse(false))

Obviously, if you want to end up with an Option[Boolean], you would write something like this instead:


FP.foldLeftShort[Option[Boolean], Option[Boolean]](boi, Some(false), b => b.getOrElse(false))(optionalOr)

/**
    * NOTE: that we can't use map2 here because we are essentially implementing three-valued (Kleenean) logic
  *
  * @param b1 the accumulator: if b2 is None or Some(false) then b1 is returned unchanged
  * @param b2 the value: if b2 is Some(true) then Some(true) will be returned
  * @return an Optional[Boolean]
  */
def optionalOr(b1: Option[Boolean], b2: Option[Boolean]): Option[Boolean] =
    b2 match {
      case Some(x) => for (b <- b1) yield b || x
      case None => b1
    }

Note that, if you use map2(_ || _) instead of optionalOr, the logic is probably not going to give you exactly what you want, because None will combine with false to give None, whereas you really want None to have no effect on the accumulator value at all.

Saturday, June 17, 2017

Another cautionary tale of Spark and serialization

In my earlier post on the subject of Spark and Serialization, I forgot one big gotcha that has come up a couple of times for me (I thought I had mentioned it, but now I see that I didn't).

The problem is with Stream and other non-strict collections such as Iterator or SeqView. These are inherently unserializable (except for empty collections). I'm sure there are several posts on this in StackOverflow, although I couldn't actually find one when I searched just now. But, I hear you say, these are never the kind of thing you would want to store as a (non-transient) field in a class.

That's true of course, but you might inadvertently include one, as I did just recently. Let's say you have a case class like this:

case class Person(name: String, friends: Seq[String])

You create instances of this class by running a database query on the person and getting his/her friends as a collection of strings. Something like this:

val x = sqlContext.sql("select ...").collect
val p = Person("Ronaldo", x.toSeq)

The problem here, is that, if the type of x is Stream[String], then x.toSeq will not change its type since Stream extends Seq. You need to write x.toList instead (or x.toVector).

Actually, it's much better practice never to use Seq for persistable/serializable objects. Far better to force an explicit collection type in the definition of the case class. In particular, you should use either List or Vector, unless you have some specific reason to use a different sequence. In this case, that would be:

case class Person(name: String, friends: List[String])

Sunday, May 14, 2017

Some informal thoughts on monads

I was asked to answer a Quora question about monads. I wasn't entirely happy with my answer as I wasn't sure that it actually answered the question. But I've been encouraged by some upvotes and comments so I'm going to link to it here for those of you reading my blog.

Tuesday, April 25, 2017

Spark and serialization

One of the trickiest issues for Spark developers is serialization. I'm going to provide a checklist of things to think about when your Spark job doesn't work as expected. This article is directed at Scala programmers, although the principles are the same of course for Java developers (with the exception that there are no Case Classes in Java).

Serialization of functions

This is the one that newbies run into pretty quickly. Any function that you pass to one of Spark's higher-order functions (such as the map method of RDD) must be serializable. Once you think about it, it's pretty obvious but when you're new to Spark, it may not be so clear. All of the real work of Spark is performed on the executors, and so any such function has to be sent "over the wire" to the executors. Hence, the function must be serializable. Functions will be serializable in any of the following three situations:

  • the function is a "pure" function, for example one that is created as an anonymous function (i.e. a lambda);
  • the function is a partially applied method (that's to say the method has undergone the 𝜂 (eta) transformation whereby the method is followed by the underscore character) where the method is part of:
    • an object
    • a serializable class
Note what's missing: the function derives from a method belonging to a non-serializable class. This is because an instance function (i.e. belonging to a class rather than an object) has an (implicit) parameter for this, where this is the appropriate instance of the class.

UDFs and Broadcasts

Don't forget, too, that anything object which is passed as a parameter into a class representing a UDF (user-defined function), or anything which you broadcast, must also be serializable.

Abstract Classes

Any class whose instances must be serialized (including, as described above, classes where the methods are used as functions) needs to be marked Serializable. But: even if you mark your concrete class as Serializable, you may still run into a problem with serialization. In such a case, you must ensure that any abstract classes which the class extends are also marked Serializable.

Case Classes

Case classes (and case objects) in Scala are inherently serializable. This is a good thing but it can have unforeseen consequences if your case class is mutable. Let's say you create a case class which has a mutable field (or variable). Perhaps there is a method which can be called to initialize that field or variable. When you run it in the driver, everything looks good. But, if that object needs to be serialized, then the initialization method won't be called after deserialization. If you had had to explicitly mark the case class as serializable, you might have thought about this problem beforehand.

Preemptive serialization checking

If you want to check that objects/functions are serializable before you get too far into your Spark job -- or perhaps you are performing a functional test, then you can use the following Spark environment variable:

SparkEnv.get.closureSerializer

(this is the same serializer that Spark will use). Here's a handy case class/method which will validate serialization:

case class Serialization(serializer: Serializer) {
  def validate[T : ClassTag](obj: T): Boolean = {
    val serializerInstance = serializer.newInstance()
    val result: T = serializerInstance.deserialize(serializerInstance.serialize(obj))
    result==obj
  }
}

Just pass the result of  invoking SparkEnv.get.closureSerializer into the Serialization class and run the validate method on anything you want to check. If it returns true, then all is well. If it returns false, then the deserialized version isn't the same as the original (e.g. you tried to serialize a mutable class as described above). If it throws an exception, then something in the serialization stack (essentially the relevant class hierarchy) isn't marked Serializable when it should be.

Thursday, March 16, 2017

Render unto Caesar...

There's a minor issue with both Java and Scala, at least Scala out-of-the-box. That is, while there is a way to turn any object into a String for debugging purposes (it's the toString method available in both Java and Scala), there is no easy, consistent way to turn an object into a String that can be used for presentation.

In a former life, I have created methods to do this for Java. But it's frustrating at best. In Java, if you want a class to take on some particular behavior, you have to arrange for that class to extend an interface that defines the behavior. If that class, some sort of collection for instance, isn't under your control, you're out of luck.

Not so in Scala!  You can add behavior to a (someone else's) library class through the magic of implicits.

So, let's say we define the presentation version of toString in a trait called Renderable, with one method render.

trait Renderable {
  def render: String
}

Any class that you want to be able to prettify in output can extend Renderable. So far, so good. But suppose that you have a list of Renderable objects. It's not much use if you can make the elements pretty but not the whole list. You could, for example, define a RenderableSeq as follows:

case class RenderableSeq[A <: Renderable](as: Seq[A]) extends Renderable {
  def render: String = as map (_ render) mkString ","
}

But, apart from the fact that this is rather ugly and involves creating a set of special containers with which to wrap your sequences, it's simply impractical. Much of the time, you will be dealing with containers that are elements of other classes and you won't be able to inject your desired behavior.

This is where implicits come to the rescue.

import scala.util._

trait Renderable {
  def render: String
}

case class RenderableTraversable(xs: Traversable[_]) extends Renderable {
  def render: String = {
    def addString(b: StringBuilder, start: String, sep: String, end: String): StringBuilder = {
      var first = true
      b append start
      for (x <- xs) {
        if (first) {
          b append Renderable.renderElem(x)
          first = false
        }
        else {
          b append sep
          b append Renderable.renderElem(x)
        }
      }
      b append end
      b
    }
    addString(new StringBuilder, "(\n", ",\n", "\n" + ")").toString()
  }
}

case class RenderableOption(xo: Option[_]) extends Renderable {
  def render: String = xo match {
    case Some(x) => s"Some(" + Renderable.renderElem(x) + ")"
    case _ => "None"
  }
}

case class RenderableTry(xy: Try[_]) extends Renderable {
  def render: String = xy match {
    case Success(x) => s"Success(" + Renderable.renderElem(x) + ")"
    case _ => "Failure"
  }
}

case class RenderableEither(e: Either[_, _]) extends Renderable {
  def render: String = e match {
    case Left(x) => s"Left(" + Renderable.renderElem(x) + ")"
    case Right(x) => s"Right(" + Renderable.renderElem(x) + ")"
  }
}

object Renderable {
  implicit def renderableTraversable(xs: Traversable[_]): Renderable = RenderableTraversable(xs)

  implicit def renderableOption(xo: Option[_]): Renderable = RenderableOption(xo)

  implicit def renderableTry(xy: Try[_]): Renderable = RenderableTry(xy)

  implicit def renderableEither(e: Either[_, _]): Renderable = RenderableEither(e)

  def renderElem(elem: Any): String = elem match {
    case xo: Option[_] => renderableOption(xo).render
    case xs: Traversable[_] => renderableTraversable(xs).render
    case xy: Try[_] => renderableTry(xy).render
    case e: Either[_, _] => renderableEither(e).render
    case r: Renderable => r.render
    case x => x.toString
  }
}

With this design, you can simply invoke render on any object that is either Renderable itself or one of the four implicit Renderable containers defined above. A complex class with embedded options, sequences, whatever simply has to extend Renderable and all elements that can be rendered thus will be (other classes will simply be converted to String via toString). Your application code hardly needs to be aware of the Renderable trait because the implicit converters are defined, implicitly, in the Renderable companion object.

In practice, this particular render method isn't all that useful. I defined it thus to simplify the logic of the implicit definitions. In my LaScala repository on Github [currently only defined in branch V_1_0_1 but soon to be merged with the master branch], I have defined a rather more sophisticated version as follows:

trait Renderable {

  /**
    * Method to render this object in a human-legible manner.
    *
    * @param indent the number of "tabs" before output should start (when writing on a new line).
    * @param tab    an implicit function to translate the tab number (i.e. indent) to a String of white space.
    *               Typically (and by default) this will be uniform. But you're free to set up a series of tabs
    *               like on an old typewriter where the spacing is non-uniform.
    * @return a String that, if it has embedded newlines, will follow each newline with (possibly empty) white space,
    *         which is then followed by some human-legible rendition of *this*.
    */
  def render(indent: Int = 0)(implicit tab: Int => Prefix): String
}
case class Prefix(s: String) {
  override def toString: String = s
}

object Prefix {

  /**
    * The default tab method which translates indent uniformly into that number of double-spaces.
    *
    * @param indent the number of tabs before output should start on a new line.
    * @return a String of white space.
    */
  implicit def tab(indent: Int): Prefix = Prefix(" " * indent * 2)
}

The other classes are more or less exactly as shown above, but with the more complex version of render supported. It would be possible to add further complexity, for example letting the render method choose whether to use newlines rather than commas--according to the space available. But so far, I haven't felt that the extra complexity was really justified.

Friday, March 10, 2017

Some mistakes to avoid when using Scalatest

I've been writing a lot of Scalatest specifications lately. And I've discovered a couple of tips which I think are worth sharing.

First, it's a common pattern, especially if you are doing anything like parsing, that you want to do something like this:


val expr = "x>1 & (x<3 | x=99)""rule" should s"parse $expr using parseRule" in {
  val parser = new RuleParser()
  parser.parseRule(expr) should matchPattern { case scala.util.Success(_) => }
}
Unfortunately, there's a snag. If you want to repeat that specific test (and not the remainder of the Specification), it will result in a run that is green (!) but which contains the message "Empty test suite". That seems like a bug in the test runner to me--it shouldn't show green when it didn't actually test anything. But there's an easy workaround for this that will allow you to repeat that specific test. Just write it like this:

val expr = "x>1 & (x<3 | x=99)""rule" should "parse " + expr + " using parseRule" in {
  val parser = new RuleParser()
  parser.parseRule(expr) should matchPattern { case scala.util.Success(_) => }
}

It's very slightly more awkward to write the code but it's not that hard, especially if you know the trick to splitting a String. Just be sure to avoid using s"...".The second problem I ran into is a little more troubling.


behavior of "silly"
it should "use for comprehension properly" in {
  for (x <- Try("x")) yield x=="y" should matchPattern { case Success(true) => }
}
The result of running this was green:
Testing started at 11:06 AM ...


Process finished with exit code 0

See anything wrong? Clearly, the result should have been Success(false) but it apparently matched. Or did it? Actually, it the whole expression of yield through } was run as a side effect, yielding nothing. And, although I haven't really gone into detail, clearly the test runner didn't get any message about a failure. I don't think this is a bug, although it might perhaps be possible to fix it. But all you have to do is to ensure that you put parentheses around the entire for comprehension thus:
(for (x <- Try("x")) yield x=="y") should matchPattern { case Success(true) => }