Scala flatmap and Parallel programming

We usually use flatmap and map in scala. Of course, if we use future we chaining by using flatmap or map… etc. But is this run parallel? In fact (as you know) it’s not parallel. If you chaining some monads, it runs sequentially.

Future chaining is not parallel

Let’s see some example. Guess that, there is some db dependent function like beneath

case class UserInfo(userId: Int, nickname: String, emailAddress: String, account: Option[Long])
def getNicknameById(userId: Int): Future[String] = Future {
Thread.sleep(2000)
"ktz"
}
def getEmailAddressByNickname(nickname: String): Future[String] = Future {
Thread.sleep(2000)
"helloworld@example.com"
}
def getAccountInfoByNickname(nickname: String): Future[Option[Long]] = Future {
Thread.sleep(3000)
None
}
def printInterval(userInfo: UserInfo, start: Instant, end: Instant): String =
s"$userInfo take ${(end.toEpochMilli - start.toEpochMilli) / 1000} seconds"
val userId: Int = 31337
view raw simulation.sc hosted with ❤ by GitHub

Each takes 2 seconds, 2 seconds, 3 seconds. And we want to make userInfo by calling functions. You can call like this.

/**
* Just use for comprehension
*/
val start1: Instant = Instant.now
val fUserInfo1 = for {
nickname <- getNicknameById(userId)
emailAddr <- getEmailAddressByNickname(nickname)
accountInfo <- getAccountInfoByNickname(nickname)
} yield UserInfo(userId, nickname, emailAddr, accountInfo)
val userInfo1 = Await.result(fUserInfo1, 10 second)
val end1: Instant = Instant.now
println(printInterval(userInfo1, start1, end1)) // UserInfo(31337,ktz,helloworld@example.com,None) take 7 seconds

As you can see, It takes 7 seconds (2 sec + 2 sec + 3 sec). It is not parallel.

How to be parallel?

by function

There easy 2 ways to be parallel. Solve this by function. You can make like this.

/**
* Calling first and use next
*/
def getEmailAndAccountTuple(fNickname: Future[String]): Future[(String, Option[Long])] =
fNickname.flatMap(nickname => {
val fEmailAddr = getEmailAddressByNickname(nickname)
val fAccountInfo = getAccountInfoByNickname(nickname)
fEmailAddr.flatMap(email => fAccountInfo.map(account => (email, account)))
})
val start2: Instant = Instant.now
val fNickname: Future[String] = getNicknameById(userId)
val fUserInfo2 = for {
nickname <- fNickname
(email, accountInfo) <- getEmailAndAccountTuple(fNickname)
} yield UserInfo(userId, nickname, email, accountInfo)
val userInfo2 = Await.result(fUserInfo2, 10 second)
val end2: Instant = Instant.now
println(printInterval(userInfo2, start2, end2)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds

It takes 5 seconds(2 seconds + max(2 seconds, 3 seconds)). It’s because, before chaining, Future already started. And after getting an email address, 2 seconds, we only need to wait for 1 second to get accountInfo because it runs parallel.

by using the Cartesian product and applicative

But it has to make function. It’s annoying. We can simply make this by using cats.

/**
* Use cats applicative
*/
import cats.syntax.all._
import cats.instances.future._
val start3: Instant = Instant.now
val fUserInfo3 = for {
nickname <- getNicknameById(userId)
(email, account) <- (getEmailAddressByNickname(nickname) |@| getAccountInfoByNickname(nickname)).tupled
} yield UserInfo(userId, nickname, email, account)
val userInfo3: UserInfo = Await.result(fUserInfo3, 10 second)
val end3: Instant = Instant.now
println(printInterval(userInfo3, start3, end3)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds

It also takes 5 seconds, But it is simpler than by using a function.

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
case class UserInfo(userId: Int, nickname: String, emailAddress: String, account: Option[Long])
def getNicknameById(userId: Int): Future[String] = Future {
Thread.sleep(2000)
"ktz"
}
def getEmailAddressByNickname(nickname: String): Future[String] = Future {
Thread.sleep(2000)
"helloworld@example.com"
}
def getAccountInfoByNickname(nickname: String): Future[Option[Long]] = Future {
Thread.sleep(3000)
None
}
def printInterval(userInfo: UserInfo, start: Instant, end: Instant): String =
s"$userInfo take ${(end.toEpochMilli - start.toEpochMilli) / 1000} seconds"
val userId: Int = 31337
/**
* Just use for comprehension
*/
val start1: Instant = Instant.now
val fUserInfo1 = for {
nickname <- getNicknameById(userId)
emailAddr <- getEmailAddressByNickname(nickname)
accountInfo <- getAccountInfoByNickname(nickname)
} yield UserInfo(userId, nickname, emailAddr, accountInfo)
val userInfo1 = Await.result(fUserInfo1, 10 second)
val end1: Instant = Instant.now
println(printInterval(userInfo1, start1, end1)) // UserInfo(31337,ktz,helloworld@example.com,None) take 7 seconds
/**
* Calling first and use next
*/
def getEmailAndAccountTuple(fNickname: Future[String]): Future[(String, Option[Long])] =
fNickname.flatMap(nickname => {
val fEmailAddr = getEmailAddressByNickname(nickname)
val fAccountInfo = getAccountInfoByNickname(nickname)
fEmailAddr.flatMap(email => fAccountInfo.map(account => (email, account)))
})
val start2: Instant = Instant.now
val fNickname: Future[String] = getNicknameById(userId)
val fUserInfo2 = for {
nickname <- fNickname
(email, accountInfo) <- getEmailAndAccountTuple(fNickname)
} yield UserInfo(userId, nickname, email, accountInfo)
val userInfo2 = Await.result(fUserInfo2, 10 second)
val end2: Instant = Instant.now
println(printInterval(userInfo2, start2, end2)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds
/**
* Use cats applicative
*/
import cats.syntax.all._
import cats.instances.future._
val start3: Instant = Instant.now
val fUserInfo3 = for {
nickname <- getNicknameById(userId)
(email, account) <- (getEmailAddressByNickname(nickname) |@| getAccountInfoByNickname(nickname)).tupled
} yield UserInfo(userId, nickname, email, account)
val userInfo3: UserInfo = Await.result(fUserInfo3, 10 second)
val end3: Instant = Instant.now
println(printInterval(userInfo3, start3, end3)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds
view raw ChaingFuture.sc hosted with ❤ by GitHub

Applicative in Scala Cats

I always heard about Applicative, but I don’t know what really is. So, I write this post to understand myself. Below picture is from adit.io and example is from mastering advanced Scala.

What is Applicative?

During programming, you will encounter this situation.

def getSome(a: Int): Option[Int] = Some(a)

def getNone(a: Int): Option[Int] = None

def add(a: Int, b: Int): Int = a + b

val aOpt = getSome(1)

val bOpt = getSome(2)

And I want to get aOpt and bOpt add together using add. But aOpt and bOpt is Option. So, the usual way to do is by using flatMap and map like this.


aOpt.flatMap(a => bOpt.map(b => add(a, b)))
// res0: Option[Int] = Some(3)

In here, we can use Applicative.

What is Applicative?

I don’t know that my understanding of Applicative is right. But I think, Applicative is

applicative_just.png

Very famous picture about Applicative.

In here there is a wrapped value: 2. And there is a wrapped function. We unwrapped both function and value, and process some operation and wrap it again! And we can also adapt these things to above example.


import cats.instances.option._
import cats.Applicative

Applicative[Option].map2(aOpt, bOpt)(add)
// res1: Option[Int] = Some(3)

We can also use like this.


import cats.syntax.all._

(aOpt |@| bOpt).map(add)

Monad extends Applicative


@typeclass trait Monad[F[_]] extends FlatMap[F] with Applicative[F] {
  override def map[A, B](fa: F[A])(f: A => B): F[B] =
  flatMap(fa)(a => pure(f(a)))
}

As you can see, Monad extends Applicative. And Applicative extends Functor. So, hierarchy is ‘Monad <: Applicative <: Functor’

Tip – Traverse

While studying with Mastering Advanced Scala, there are some tips in Applicative, So, I write down this. If there is a List[Int] and want all element adapt ‘getSome’ function, for example


val ints = List(1,2,3,4,5)
ints.map(getSome)
// res3: List[Option[Int]] = List(Some(1), Some(2), Some(3), Some(4), Some(5))

The result is ‘List(Some(1), Some(2), Some(3), Some(4), Some(5))’ and if you want to make ‘Some(List(1, 2, 3, 4, 5))’ you can use Traverse like this.


import cats.Traverse
import cats.instances.list._
import cats.instances.option._

Traverse[List].traverse(ints)(getSome)

// res4: Option[List[Int]] = Some(List(1, 2, 3, 4, 5))

This will return None if any element return None


def getSomeOrNone(a: Int): Option[Int] =
  if(a % 2 == 0) Some(a)
  else None

Traverse[List].traverse(ints)(getSomeOrNone)

// res5: Option[List[Int]] = None

 

def getSome(a: Int): Option[Int] = Some(a)
def getNone(a: Int): Option[Int] = None
def add(a: Int, b: Int): Int = a + b
val aOpt = getSome(1)
val bOpt = getSome(2)
aOpt.flatMap(a => bOpt.map(b => add(a, b)))
import cats.instances.option._
import cats.Applicative
Applicative[Option].map2(aOpt, bOpt)(add)
import cats.syntax.all._
(aOpt |@| bOpt).map(add)
val ints = List(1,2,3,4,5)
ints.map(getSome)
import cats.Traverse
import cats.instances.list._
import cats.instances.option._
Traverse[List].traverse(ints)(getSome)
def getSomeOrNone(a: Int): Option[Int] =
if(a % 2 == 0) Some(a)
else None
Traverse[List].traverse(ints)(getSomeOrNone)

Cartesian Product in Scala Cats

During work, I have some problem handling three future at the same time. First future return a long value and second and third future receive the parameter from the first return and return a tuple.

The code is like this.


for {
  long <- futureLong()
  int <- futureInt(long)
  char <- futureChar(long)
} yield (int, char)

Of course for comprehension operates step by step, so, futureInt is called after futureLong is finished and futureChar is called after futureInt is finished. So, It takes too much time to get a final return value.

So, I changed codes like this.


val longVal: Future[Long] = futureLong()

val intVal: Future[Int] = for {
  long <- futureLong()
  int <- futureInt(long)
} yield int

val charVal: Future[Char] = for {
  long <- futureLong()
  char <- futureChar(long)
} yield char

val forComp: (Int, Char) = Await.result(for {
  int <- intVal
  char <- charVal
} yield (int, char), 5 second)

Yes, It works as I want. But, there is duplicated codes like, ‘long <- futureLong()’. During thinking a long time, my coworker, Liam, recommends using  Cartesian Product.

What is Cartesian Product?

Cartesian Product is very simple. It’s just a product of two Sets. For example, if there are two sets A = {1,2,3} and B = {‘a’, ‘b’, ‘c’}, Cartesian Product of A X B is

{(1, ‘a’), (1, ‘b’), (1, ‘c’), (2, ‘a’), (2, ‘b’), (2, ‘c’), (3, ‘a’), (3, ‘b’), (3, ‘c’)}

Yes. that’s all. Of course Cartesian Product didn’t hold commutative law. For example Cartesian Product of B X A is

{(‘a’, 1), (‘a’, 2), (‘a’, 3), (‘b’, 1), (‘b’, 2), (‘b’, 3), (‘c’, 1), (‘c’, 2), (‘c’, 3)}

And different from A X B.

Cartesian Product in cats

Above example can be implemented in Scala.


import cats.instances.list._
import cats.syntax.all._

val ints: List[Int] = List(1, 2, 3, 4)
val chars: List[Char] = List('a', 'b', 'c')

(ints |@| chars).tupled
// res0: List[(Int, Char)] = List((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c), (4,a), (4,b), (4,c))
(chars |@| ints).tupled
// res1: List[(Char, Int)] = List((a,1), (a,2), (a,3), (a,4), (b,1), (b,2), (b,3), (b,4), (c,1), (c,2), (c,3), (c,4))

(ints |@| chars).map(_ + _.toString)
// res2: List[String] = List(1a, 1b, 1c, 2a, 2b, 2c, 3a, 3b, 3c, 4a, 4b, 4c)

In Cats Cartesian Product Syntax is ‘|@|’.(Some people call it Oink, and others call Pizza box) If you call tuple, it will return List Tuple. You can also map it.

Cartesian Product in Future

Ok, let’s return to the previous problem. I need to call futureInt and futureChar at a same time. And using three for comprehension is too dirty. So, how and I solve this with Cartesian Product? It’s very simple.


for {
  long <- futureLong()
  (int, char) <- (futureInt(long) |@| futureChar(long)).tupled
} yield (int, char)

Very simple right? You can test whether this expression really operates at the same time.

package org.example.ktz.blog
import cats.syntax.all._
import cats.instances.future._
import org.scalatest._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class CartesianProductTest extends FlatSpec with Matchers {
def futureLong(): Future[Long] = Future {
Thread.sleep(2000)
println("Future Long Done!")
2
}
def futureInt(long: Long): Future[Int] = Future{
Thread.sleep(2000)
println("Future Int Done!")
long.toInt
}
def futureChar(long: Long): Future[Char] = Future{
Thread.sleep(2000)
println("Future Char Done!")
long.toChar
}
"Concurrent" should "for comprehension" in {
val forComp: (Int, Char) = Await.result(for {
long <- futureLong()
int <- futureInt(long)
char <- futureChar(long)
} yield (int, char), 5 second)
// java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
}
"Concurrent" should "for comprehension2" in {
val longVal: Future[Long] = futureLong()
val intVal: Future[Int] = for {
long <- futureLong()
int <- futureInt(long)
} yield int
val charVal: Future[Char] = for {
long <- futureLong()
char <- futureChar(long)
} yield char
val forComp: (Int, Char) = Await.result(for {
int <- intVal
char <- charVal
} yield (int, char), 5 second)
// Work well 🙂
}
"Concurrent" should "cartesian" in {
val cartesion: (Int, Char) = Await.result(for {
long <- futureLong()
(int, char) <- (futureInt(long) |@| futureChar(long)).tupled
} yield (int, char), 5 second)
// Work well 🙂
}
}

In this test, the first test occurs TimeoutException and second and third pass test well. Then, we can make a conclusion that, last expression also run at the same time.