Scala flatmap and Parallel programming

We usually use flatmap and map in scala. Of course, if we use future we chaining by using flatmap or map… etc. But is this run parallel? In fact (as you know) it’s not parallel. If you chaining some monads, it runs sequentially.

Future chaining is not parallel

Let’s see some example. Guess that, there is some db dependent function like beneath

case class UserInfo(userId: Int, nickname: String, emailAddress: String, account: Option[Long])
def getNicknameById(userId: Int): Future[String] = Future {
Thread.sleep(2000)
"ktz"
}
def getEmailAddressByNickname(nickname: String): Future[String] = Future {
Thread.sleep(2000)
"helloworld@example.com"
}
def getAccountInfoByNickname(nickname: String): Future[Option[Long]] = Future {
Thread.sleep(3000)
None
}
def printInterval(userInfo: UserInfo, start: Instant, end: Instant): String =
s"$userInfo take ${(end.toEpochMilli - start.toEpochMilli) / 1000} seconds"
val userId: Int = 31337
view raw simulation.sc hosted with ❤ by GitHub

Each takes 2 seconds, 2 seconds, 3 seconds. And we want to make userInfo by calling functions. You can call like this.

/**
* Just use for comprehension
*/
val start1: Instant = Instant.now
val fUserInfo1 = for {
nickname <- getNicknameById(userId)
emailAddr <- getEmailAddressByNickname(nickname)
accountInfo <- getAccountInfoByNickname(nickname)
} yield UserInfo(userId, nickname, emailAddr, accountInfo)
val userInfo1 = Await.result(fUserInfo1, 10 second)
val end1: Instant = Instant.now
println(printInterval(userInfo1, start1, end1)) // UserInfo(31337,ktz,helloworld@example.com,None) take 7 seconds

As you can see, It takes 7 seconds (2 sec + 2 sec + 3 sec). It is not parallel.

How to be parallel?

by function

There easy 2 ways to be parallel. Solve this by function. You can make like this.

/**
* Calling first and use next
*/
def getEmailAndAccountTuple(fNickname: Future[String]): Future[(String, Option[Long])] =
fNickname.flatMap(nickname => {
val fEmailAddr = getEmailAddressByNickname(nickname)
val fAccountInfo = getAccountInfoByNickname(nickname)
fEmailAddr.flatMap(email => fAccountInfo.map(account => (email, account)))
})
val start2: Instant = Instant.now
val fNickname: Future[String] = getNicknameById(userId)
val fUserInfo2 = for {
nickname <- fNickname
(email, accountInfo) <- getEmailAndAccountTuple(fNickname)
} yield UserInfo(userId, nickname, email, accountInfo)
val userInfo2 = Await.result(fUserInfo2, 10 second)
val end2: Instant = Instant.now
println(printInterval(userInfo2, start2, end2)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds

It takes 5 seconds(2 seconds + max(2 seconds, 3 seconds)). It’s because, before chaining, Future already started. And after getting an email address, 2 seconds, we only need to wait for 1 second to get accountInfo because it runs parallel.

by using the Cartesian product and applicative

But it has to make function. It’s annoying. We can simply make this by using cats.

/**
* Use cats applicative
*/
import cats.syntax.all._
import cats.instances.future._
val start3: Instant = Instant.now
val fUserInfo3 = for {
nickname <- getNicknameById(userId)
(email, account) <- (getEmailAddressByNickname(nickname) |@| getAccountInfoByNickname(nickname)).tupled
} yield UserInfo(userId, nickname, email, account)
val userInfo3: UserInfo = Await.result(fUserInfo3, 10 second)
val end3: Instant = Instant.now
println(printInterval(userInfo3, start3, end3)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds
view raw UserApplicative.sc hosted with ❤ by GitHub

It also takes 5 seconds, But it is simpler than by using a function.

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
case class UserInfo(userId: Int, nickname: String, emailAddress: String, account: Option[Long])
def getNicknameById(userId: Int): Future[String] = Future {
Thread.sleep(2000)
"ktz"
}
def getEmailAddressByNickname(nickname: String): Future[String] = Future {
Thread.sleep(2000)
"helloworld@example.com"
}
def getAccountInfoByNickname(nickname: String): Future[Option[Long]] = Future {
Thread.sleep(3000)
None
}
def printInterval(userInfo: UserInfo, start: Instant, end: Instant): String =
s"$userInfo take ${(end.toEpochMilli - start.toEpochMilli) / 1000} seconds"
val userId: Int = 31337
/**
* Just use for comprehension
*/
val start1: Instant = Instant.now
val fUserInfo1 = for {
nickname <- getNicknameById(userId)
emailAddr <- getEmailAddressByNickname(nickname)
accountInfo <- getAccountInfoByNickname(nickname)
} yield UserInfo(userId, nickname, emailAddr, accountInfo)
val userInfo1 = Await.result(fUserInfo1, 10 second)
val end1: Instant = Instant.now
println(printInterval(userInfo1, start1, end1)) // UserInfo(31337,ktz,helloworld@example.com,None) take 7 seconds
/**
* Calling first and use next
*/
def getEmailAndAccountTuple(fNickname: Future[String]): Future[(String, Option[Long])] =
fNickname.flatMap(nickname => {
val fEmailAddr = getEmailAddressByNickname(nickname)
val fAccountInfo = getAccountInfoByNickname(nickname)
fEmailAddr.flatMap(email => fAccountInfo.map(account => (email, account)))
})
val start2: Instant = Instant.now
val fNickname: Future[String] = getNicknameById(userId)
val fUserInfo2 = for {
nickname <- fNickname
(email, accountInfo) <- getEmailAndAccountTuple(fNickname)
} yield UserInfo(userId, nickname, email, accountInfo)
val userInfo2 = Await.result(fUserInfo2, 10 second)
val end2: Instant = Instant.now
println(printInterval(userInfo2, start2, end2)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds
/**
* Use cats applicative
*/
import cats.syntax.all._
import cats.instances.future._
val start3: Instant = Instant.now
val fUserInfo3 = for {
nickname <- getNicknameById(userId)
(email, account) <- (getEmailAddressByNickname(nickname) |@| getAccountInfoByNickname(nickname)).tupled
} yield UserInfo(userId, nickname, email, account)
val userInfo3: UserInfo = Await.result(fUserInfo3, 10 second)
val end3: Instant = Instant.now
println(printInterval(userInfo3, start3, end3)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds
view raw ChaingFuture.sc hosted with ❤ by GitHub

3 thoughts on “Scala flatmap and Parallel programming”

  1. Great article. Especially applicative usage.
    When I read, i remembered this: http://viktorklang.com/blog/Futures-in-Scala-protips-2.html
    So can I suggest another solution?

    val start4: Instant = Instant.now

    val fUserInfo4 = for {
    nickname <- getNicknameById(userId)
    emailAddrF = getEmailAddressByNickname(nickname)
    accountInfoF = getAccountInfoByNickname(nickname)
    emailAddr <- emailAddrF
    accountInfo <- accountInfoF
    } yield UserInfo(userId, nickname, emailAddr, accountInfo)

    val userInfo4 = Await.result(fUserInfo4, 10.second)

    val end4: Instant = Instant.now

    println(printInterval(userInfo4, start4, end4)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds

    Like

Leave a Reply to knightpop Cancel reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s