Scala flatmap and Parallel programming

We usually use flatmap and map in scala. Of course, if we use future we chaining by using flatmap or map… etc. But is this run parallel? In fact (as you know) it’s not parallel. If you chaining some monads, it runs sequentially.

Future chaining is not parallel

Let’s see some example. Guess that, there is some db dependent function like beneath

case class UserInfo(userId: Int, nickname: String, emailAddress: String, account: Option[Long])
def getNicknameById(userId: Int): Future[String] = Future {
Thread.sleep(2000)
"ktz"
}
def getEmailAddressByNickname(nickname: String): Future[String] = Future {
Thread.sleep(2000)
"helloworld@example.com"
}
def getAccountInfoByNickname(nickname: String): Future[Option[Long]] = Future {
Thread.sleep(3000)
None
}
def printInterval(userInfo: UserInfo, start: Instant, end: Instant): String =
s"$userInfo take ${(end.toEpochMilli - start.toEpochMilli) / 1000} seconds"
val userId: Int = 31337
view raw simulation.sc hosted with ❤ by GitHub

Each takes 2 seconds, 2 seconds, 3 seconds. And we want to make userInfo by calling functions. You can call like this.

/**
* Just use for comprehension
*/
val start1: Instant = Instant.now
val fUserInfo1 = for {
nickname <- getNicknameById(userId)
emailAddr <- getEmailAddressByNickname(nickname)
accountInfo <- getAccountInfoByNickname(nickname)
} yield UserInfo(userId, nickname, emailAddr, accountInfo)
val userInfo1 = Await.result(fUserInfo1, 10 second)
val end1: Instant = Instant.now
println(printInterval(userInfo1, start1, end1)) // UserInfo(31337,ktz,helloworld@example.com,None) take 7 seconds

As you can see, It takes 7 seconds (2 sec + 2 sec + 3 sec). It is not parallel.

How to be parallel?

by function

There easy 2 ways to be parallel. Solve this by function. You can make like this.

/**
* Calling first and use next
*/
def getEmailAndAccountTuple(fNickname: Future[String]): Future[(String, Option[Long])] =
fNickname.flatMap(nickname => {
val fEmailAddr = getEmailAddressByNickname(nickname)
val fAccountInfo = getAccountInfoByNickname(nickname)
fEmailAddr.flatMap(email => fAccountInfo.map(account => (email, account)))
})
val start2: Instant = Instant.now
val fNickname: Future[String] = getNicknameById(userId)
val fUserInfo2 = for {
nickname <- fNickname
(email, accountInfo) <- getEmailAndAccountTuple(fNickname)
} yield UserInfo(userId, nickname, email, accountInfo)
val userInfo2 = Await.result(fUserInfo2, 10 second)
val end2: Instant = Instant.now
println(printInterval(userInfo2, start2, end2)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds

It takes 5 seconds(2 seconds + max(2 seconds, 3 seconds)). It’s because, before chaining, Future already started. And after getting an email address, 2 seconds, we only need to wait for 1 second to get accountInfo because it runs parallel.

by using the Cartesian product and applicative

But it has to make function. It’s annoying. We can simply make this by using cats.

/**
* Use cats applicative
*/
import cats.syntax.all._
import cats.instances.future._
val start3: Instant = Instant.now
val fUserInfo3 = for {
nickname <- getNicknameById(userId)
(email, account) <- (getEmailAddressByNickname(nickname) |@| getAccountInfoByNickname(nickname)).tupled
} yield UserInfo(userId, nickname, email, account)
val userInfo3: UserInfo = Await.result(fUserInfo3, 10 second)
val end3: Instant = Instant.now
println(printInterval(userInfo3, start3, end3)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds

It also takes 5 seconds, But it is simpler than by using a function.

import java.time.Instant
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
case class UserInfo(userId: Int, nickname: String, emailAddress: String, account: Option[Long])
def getNicknameById(userId: Int): Future[String] = Future {
Thread.sleep(2000)
"ktz"
}
def getEmailAddressByNickname(nickname: String): Future[String] = Future {
Thread.sleep(2000)
"helloworld@example.com"
}
def getAccountInfoByNickname(nickname: String): Future[Option[Long]] = Future {
Thread.sleep(3000)
None
}
def printInterval(userInfo: UserInfo, start: Instant, end: Instant): String =
s"$userInfo take ${(end.toEpochMilli - start.toEpochMilli) / 1000} seconds"
val userId: Int = 31337
/**
* Just use for comprehension
*/
val start1: Instant = Instant.now
val fUserInfo1 = for {
nickname <- getNicknameById(userId)
emailAddr <- getEmailAddressByNickname(nickname)
accountInfo <- getAccountInfoByNickname(nickname)
} yield UserInfo(userId, nickname, emailAddr, accountInfo)
val userInfo1 = Await.result(fUserInfo1, 10 second)
val end1: Instant = Instant.now
println(printInterval(userInfo1, start1, end1)) // UserInfo(31337,ktz,helloworld@example.com,None) take 7 seconds
/**
* Calling first and use next
*/
def getEmailAndAccountTuple(fNickname: Future[String]): Future[(String, Option[Long])] =
fNickname.flatMap(nickname => {
val fEmailAddr = getEmailAddressByNickname(nickname)
val fAccountInfo = getAccountInfoByNickname(nickname)
fEmailAddr.flatMap(email => fAccountInfo.map(account => (email, account)))
})
val start2: Instant = Instant.now
val fNickname: Future[String] = getNicknameById(userId)
val fUserInfo2 = for {
nickname <- fNickname
(email, accountInfo) <- getEmailAndAccountTuple(fNickname)
} yield UserInfo(userId, nickname, email, accountInfo)
val userInfo2 = Await.result(fUserInfo2, 10 second)
val end2: Instant = Instant.now
println(printInterval(userInfo2, start2, end2)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds
/**
* Use cats applicative
*/
import cats.syntax.all._
import cats.instances.future._
val start3: Instant = Instant.now
val fUserInfo3 = for {
nickname <- getNicknameById(userId)
(email, account) <- (getEmailAddressByNickname(nickname) |@| getAccountInfoByNickname(nickname)).tupled
} yield UserInfo(userId, nickname, email, account)
val userInfo3: UserInfo = Await.result(fUserInfo3, 10 second)
val end3: Instant = Instant.now
println(printInterval(userInfo3, start3, end3)) // UserInfo(31337,ktz,helloworld@example.com,None) take 5 seconds
view raw ChaingFuture.sc hosted with ❤ by GitHub

Few tips using Zipkin

Zipkin is a very useful tracing solution while building microservice. (It is also useful at any service because it measure latency between two services). It becomes more and more popular but there are little documents. This post is few tips while adapting Zipkin in production.

Zipkin with Finatra

Zipkin is firstly made by Twitter and it supports Finatra (Base on Twitter Server and Finagle) well. Integrating Zipkin with Finatra is very simple.

1. Just add a dependency to project.(libraryDependencies in build.sbt)


com.twitter" %% "finagle-zipkin" %% finagle-version

2. And after that, just add Twitter flag while running Finatra Server. You can see Twitter flags when you add ‘-help’ end of jar or SBT. And all you need to know about flags related to Zipkin is

-thrift.name='thrift': Thrift server name // When Thrift Server
or
-http.name='http': Http server name // When Http Server
-com.twitter.finagle.zipkin.host='localhost:1463': Host to scribe traces to
-com.twitter.finagle.zipkin.initialSampleRate='0.001': Initial sample rate

Yes! when you add Twitter flags end of your application, it works! But sometimes, I have to check whether it is applied. Then you can check in Twitter Server admin. Its URL is ‘admin/clients/zipkin-tracer’.

스크린샷 2017-10-17 오후 10.27.42

If you set Zipkin Server properly, you can see your server address in ‘Set(Inet({Server Address}, Map()))’.

Add Twitter flag in SBT

When you want to add Twitter flags while in sbt run, you can add twitter flags like this

$ sbt 'run -help'

Existing instrumentations

Zipkin support many framework and language. If you want to find a supported framework, You can find it here.

Scribe

Above page, there are main 2 kinds of protocol, HTTP, and Scribe. As you know, Zipkin Server supports Http and Scribe. I was very curious what a Scribe is. You can see description in here. Simply to say, Scribe is logging protocol base on thrift.

com.twitter:finagle-zipkin

finagle-zipkin has two version. One is openzipkin/zipkin-finagle and another is finagle-zipkin in twitter/finagle. What is a difference between two things? openzipkin/zipkin-finagle is maintained by the openzipkin group. You can select protocol HTTP and Scribe. But, finagle-zipkin in twitter/finagle is little different. It supports only Scribe. so, if you want to connect with Finatra, you have to load Zipkin Server which supports Scribe.

Ports

Zipkin needs two port. One is 9411 which get HTTP Request. And 9410 which support Scribe. 9410 port is only opened when you enable Scribe support.

Implicit in Scala

In Scala, Implicit is very important. But it is very ambiguous and hard to understand. And it is learning curve to enter Scala world. This post is basic about Scala Implicit.

Implicit is operated by the compiler. If there is some error, for example, Instance type is not compatible or need implicit parameter Compiler is looking for Implicit conversion or Implicit Instance. And this post is about making implicit Conversion or Instance.

Implicit Conversion

implicit def

We can convert some instance to other instance automatically by using implicit. First, we can use ‘Implicit Conversion’. Let’s say that, there are some instances like that.

trait Printable {
def print: String
}
case class PrintableClass(value: String) extends Printable {
override def print: String = s"value: ${value.toString}"
}
case class ValueCaseClass(value: String)
view raw Instance.sc hosted with ❤ by GitHub

If we want to convert ‘ValueCaseClass’ to ‘PrintableClass’, you can use by using implicit def like this.

implicit def convertValueCaseClassToPrintable(valueCaseClass: ValueCaseClass): PrintableClass =
PrintableClass(valueCaseClass.value)
val printableClass: PrintableClass = ValueCaseClass("Implicit Def") // by using convertValueCaseClassToPrintable
println(printableClass.print)
view raw ImplicitDef.sc hosted with ❤ by GitHub

In this code,


val printableClass1: PrintableClass = ValueCaseClass("Implicit Def")

printableClass1’s type is PrintableClass but we set ValueCaseClass. It is compile error. But, Compile is looking for implicit conversion and finally find implicit definition ‘convertValueCaseClassToPrintable’ and get PrintableClass.

implicit class

You can also convert by using implicit class.

implicit class ConvertableValueCaseClass(valueCaseClass: ValueCaseClass) {
def toPrintable: PrintableClass = PrintableClass(valueCaseClass.value)
}
val printableClass2: PrintableClass = ValueCaseClass("Implicit class").toPrintable // by using implicit class
println(printableClass2.print)

In this code,


val printableClass2: PrintableClass = ValueCaseClass("Implicit class").toPrintable

ValueCaseClass don’t has toPrintable. Compiler looking for implicit class and find ‘ConvertableValueCaseClass’ and there is ‘toPrintable’.

Implicit Instance

In Scala, def can get implicit parameter.

def print(implicit printableClass: PrintableClass): Unit = {
println(printableClass.print)
}
implicit val printableClass = PrintableClass("Implicit Val")
print
view raw ImplicitVal.sc hosted with ❤ by GitHub

In code above, ‘print’ need ‘printableClass’ parameter. Compiler automatically pass implicit instance ‘printableClass’.

Implicit Order

Above, there we can get the implicit instance. And I think it is very useful. We can get Implicit Instance in many ways. Let see an example.

object ImplicitOrder extends App{
implicit val implicitIntInstance1: Instance[String] = Instance("Implicit Instance in Same Conjure")
def getString(implicit implicitInstance: Instance[String]): String = implicitInstance.value
import Imported.implicitIntInstance1
println(getString)
}
case class Instance[T](value: T)
object Instance {
implicit val implicitIntInstance1: Instance[String] = Instance("Implicit Instance in Companion Object")
}
object Imported {
implicit val implicitIntInstance1: Instance[String] = Instance("Implicit Instance in imported")
}

Can you guess which string is printed in the console? The answer is ‘Implicit Instance in Same Conjure’ Implicit order is

  1. In same conjure
  2. Imported instance
  3. companion object
  4. default imported

So, if we delete

def getString(implicit implicitInstance: Instance[String]): String = implicitInstance.value

, “Implicit Instance in imported” will be printed. And if we delete

import Imported.implicitIntInstance1

, “Implicit Instance in Companion Object” will be printed

Default implicit instance by companion object.

Let’s pay attention to the companion object implicit instance. We can make default instance by implicit instance in companion object.

case class Hello(value: String)
trait Printer[T] {
def print(value: T): String
}
object Printer {
implicit val IntPrinter: Printer[Int] = new Printer[Int] {
override def print(value: Int): String = s"Type: Int - $value"
}
implicit val PersonPrinter: Printer[Hello] = new Printer[Hello] {
override def print(value: Hello): String = s"Type: Hello - $value"
}
}
def print[T](value: T)(implicit printer: Printer[T]): String = printer.print(value)
val int: Int = 1
val hello: Hello = Hello("ktz")
print(int) // Type: Int - 1
print(hello) // Type: Hello - Hello(ktz)
object InjectedObject {
implicit val injectedIntPrinter: Printer[Int] = new Printer[Int] {
override def print(value: Int): String = s"Injected - Type: Int - $value"
}
}
import InjectedObject.injectedIntPrinter
print(int) // Injected - Type: Int - 1

Above code


print(int) // Type: Int - 1
print(hello) // Type: Hello - Hello(ktz)

Function ‘print’ is passed default implicit instance in companion object


implicit val IntPrinter: Printer[Int] = new Printer[Int] {
  override def print(value: Int): String = s"Type: Int - $value"
}

implicit val PersonPrinter: Printer[Hello] = new Printer[Hello] {
  override def print(value: Hello): String = s"Type: Hello - $value"
}

But next ‘print(int)’ is passed ‘injectedIntPrinter’. Like this, we can inject any instance by importing some instance.

Tagged Type in Scala

Human always makes mistake. Also, software-engineer makes mistake. Guess there are some codes like this.

val userId: Long = 12
val deviceUuid: Long = 1234
val carSerialId: Long = 12345
def getHashCode(
userId: Long,
deviceUuid: Long,
carSerialId: Long
): String = {
val userIdPlus: Long = userId + 1
val deviceUuidPlus: Long = deviceUuid + 2
val carSerialIdPlus: Long = carSerialId + 3
s"$userIdPlus-$deviceUuidPlus-$carSerialIdPlus"
}
getHashCode(userId, deviceUuid, carSerialId) // Right Answer = res0: String = 13-1236-12348
getHashCode(deviceUuid, userId, carSerialId) // Wrong Answer = res1: String = 1235-14-12348

Two function calls


getHashCode(userId, deviceUuid, carSerialId) // Right Answer = res0: String = 13-1236-12348

getHashCode(deviceUuid, userId, carSerialId) // Wrong Answer = res1: String = 1235-14-12348

both compile. But the first one is a right answer, but secondly is wrong. Then What is a good way to help myself not make a mistake? At first thinking, It is very good way to make case class.

case class UserIdCaseClass(userId: Long)
case class DeviceUuidCaseClass(deviceUuid: Long)
case class CarSerialIdCaseClass(carSerialId: Long)
def getHashCodeCaseClass(
userIdCaseClass: UserIdCaseClass,
deviceUuidCaseClass: DeviceUuidCaseClass,
carSerialIdCaseClass: CarSerialIdCaseClass
): String = {
val userIdPlus: Long = userIdCaseClass.userId + 1
val deviceUuidPlus: Long = deviceUuidCaseClass.deviceUuid + 2
val carSerialIdPlus: Long = carSerialIdCaseClass.carSerialId + 3
s"$userIdPlus-$deviceUuidPlus-$carSerialIdPlus"
}
getHashCodeCaseClass(UserIdCaseClass(userId), DeviceUuidCaseClass(deviceUuid), CarSerialIdCaseClass(carSerialId)) // Right Answer = res2: String = 13-1236-12348
getHashCodeCaseClass(DeviceUuidCaseClass(deviceUuid), UserIdCaseClass(userId), CarSerialIdCaseClass(carSerialId)) // Not Even Compiled!

This way, we can reduce mistakes. But we lost many things. First, we have to write many boilerplate. We have to make case class every Id and other Long types. And we lost that Id is the Long type. So, if we want to use userId or carSerialId or DeviceUuid, we have to call member. And cannot assign to Long type variable! To solve these problems, we can use Tagged Type.

Tagged Type

Tagged Type is tagging to some type A and Define as a subtype of A. For example, if we want to define userId as a subtype of Long, we can declare UserId as a Tagged Type of Long. Here is following an example.

import shapeless.tag
import shapeless.tag.@@
trait UserIdTag
trait DeviceUuidTag
trait CarSerialIdTag
type UserId = Long @@ UserIdTag
type DeviceUuid = Long @@ DeviceUuidTag
type CarSerialId = Long @@ CarSerialIdTag
def getHashCodeTagged(userId: UserId, deviceUuid: DeviceUuid, carSerialId: CarSerialId): String = {
val userIdPlus: Long = userId + 1
val deviceUuidPlus: Long = deviceUuid + 2
val carSerialIdPlus: Long = carSerialId + 3
s"$userIdPlus-$deviceUuidPlus-$carSerialIdPlus"
}
val taggedUserId: UserId = tag[UserIdTag][Long](userId)
val taggedDeviceUuid: DeviceUuid = tag[DeviceUuidTag][Long](deviceUuid)
val taggedCarSerialId: CarSerialId = tag[CarSerialIdTag][Long](carSerialId)
getHashCodeTagged(taggedUserId, taggedDeviceUuid, taggedCarSerialId) // Right Answer = res2: String = 13-1236-12348
getHashCodeTagged(taggedDeviceUuid, taggedCarSerialId, taggedUserId) // Not Even Compiled

Now, UserId, DeviceUuid, CarSerialId is Subtype of Long. We can use this type as Long type. For example, add Number to UserId by just using ‘+’ operator. or can assign to Long. But, It is UserId Type, DeviceUuid cannot be passed as a parameter in UserId Position.

Tagged Type Eraser

Sometimes you want to override function by tagged type like this.

def getId(userId: UserId): Long = userId + 1
def getId(deviceUuid: DeviceUuid): Long = deviceUuid + 2 // Not Compiled because tagged type erased after compile
def getId(carSerialId: CarSerialId): Long = carSerialId + 3 // Not Compiled because tagged type erased after compile

But if you override like this, Compiler says that ‘Not Compiled because tagged type erased after compile’. Yes, Tagged Type is erased after compile time. Then how can we solve the problem? You can solve this by using Either.

def getUserIdOrDeviceIdOrCarSerialId(
id: Either[UserId, Either[DeviceUuid, CarSerialId]]
): Long = id match {
case Left(userId) => userId + 1
case Right(Left(deviceUuid)) => deviceUuid + 2
case Right(Right(carSerialId)) => carSerialId + 3
}
getUserIdOrDeviceIdOrCarSerialId(Left(taggedUserId))
getUserIdOrDeviceIdOrCarSerialId(Right(Left(taggedDeviceUuid)))
getUserIdOrDeviceIdOrCarSerialId(Right(Right(taggedCarSerialId)))

Of course, you can use Coproduct in shapeless

import shapeless.{Coproduct, CNil, :+:, Inl, Inr}
type Id = UserId :+: DeviceUuid :+: CarSerialId :+: CNil
def getId(id: Id): Long = id match {
case Inl(userId) => userId + 1
case Inr(Inl(deviceUuid)) => deviceUuid + 2
case Inr(Inr(Inl(carSerialId))) => carSerialId + 3
case Inr(Inr(Inr(cNil))) => 0
}
getId(Coproduct[Id](taggedUserId))
getId(Coproduct[Id](taggedDeviceUuid))
getId(Coproduct[Id](taggedCarSerialId))

By this way, You can keep self from make mistake.

val userId: Long = 12
val deviceUuid: Long = 1234
val carSerialId: Long = 12345
def getHashCode(
userId: Long,
deviceUuid: Long,
carSerialId: Long
): String = {
val userIdPlus: Long = userId + 1
val deviceUuidPlus: Long = deviceUuid + 2
val carSerialIdPlus: Long = carSerialId + 3
s"$userIdPlus-$deviceUuidPlus-$carSerialIdPlus"
}
getHashCode(userId, deviceUuid, carSerialId) // Right Answer = res0: String = 13-1236-12348
getHashCode(deviceUuid, userId, carSerialId) // Wrong Answer = res1: String = 1235-14-12348
// Lets do it by case class
case class UserIdCaseClass(userId: Long)
case class DeviceUuidCaseClass(deviceUuid: Long)
case class CarSerialIdCaseClass(carSerialId: Long)
def getHashCodeCaseClass(
userIdCaseClass: UserIdCaseClass,
deviceUuidCaseClass: DeviceUuidCaseClass,
carSerialIdCaseClass: CarSerialIdCaseClass
): String = {
val userIdPlus: Long = userIdCaseClass.userId + 1
val deviceUuidPlus: Long = deviceUuidCaseClass.deviceUuid + 2
val carSerialIdPlus: Long = carSerialIdCaseClass.carSerialId + 3
s"$userIdPlus-$deviceUuidPlus-$carSerialIdPlus"
}
getHashCodeCaseClass(UserIdCaseClass(userId), DeviceUuidCaseClass(deviceUuid), CarSerialIdCaseClass(carSerialId)) // Right Answer = res2: String = 13-1236-12348
//getHashCodeCaseClass(DeviceUuidCaseClass(deviceUuid), UserIdCaseClass(userId), CarSerialIdCaseClass(carSerialId)) // Not Even Compiled!
import shapeless.tag
import shapeless.tag.@@
trait UserIdTag
trait DeviceUuidTag
trait CarSerialIdTag
type UserId = Long @@ UserIdTag
type DeviceUuid = Long @@ DeviceUuidTag
type CarSerialId = Long @@ CarSerialIdTag
def getHashCodeTagged(
userId: UserId,
deviceUuid: DeviceUuid,
carSerialId: CarSerialId
): String = {
val userIdPlus: Long = userId + 1
val deviceUuidPlus: Long = deviceUuid + 2
val carSerialIdPlus: Long = carSerialId + 3
s"$userIdPlus-$deviceUuidPlus-$carSerialIdPlus"
}
val taggedUserId: UserId = tag[UserIdTag][Long](userId)
val taggedDeviceUuid: DeviceUuid = tag[DeviceUuidTag][Long](deviceUuid)
val taggedCarSerialId: CarSerialId = tag[CarSerialIdTag][Long](carSerialId)
getHashCodeTagged(taggedUserId, taggedDeviceUuid, taggedCarSerialId) // Right Answer = res2: String = 13-1236-12348
//getHashCodeTagged(taggedDeviceUuid, taggedCarSerialId, taggedUserId) // Not Even Compiled
// Can't Override
def getId(userId: UserId): Long = userId + 1
//def getId(deviceUuid: DeviceUuid): Long = deviceUuid + 2 // Not Compiled because tagged type erased after compile
//def getId(carSerialId: CarSerialId): Long = carSerialId + 3 // Not Compiled because tagged type erased after compile
// Let's do by either
def getUserIdOrDeviceIdOrCarSerialId(
id: Either[UserId, Either[DeviceUuid, CarSerialId]]
): Long = id match {
case Left(userId) => userId + 1
case Right(Left(deviceUuid)) => deviceUuid + 2
case Right(Right(carSerialId)) => carSerialId + 3
}
getUserIdOrDeviceIdOrCarSerialId(Left(taggedUserId))
getUserIdOrDeviceIdOrCarSerialId(Right(Left(taggedDeviceUuid)))
getUserIdOrDeviceIdOrCarSerialId(Right(Right(taggedCarSerialId)))
// You can do by Coproduct by shapeless
import shapeless.{Coproduct, CNil, :+:, Inl, Inr}
type Id = UserId :+: DeviceUuid :+: CarSerialId :+: CNil
def getId(id: Id): Long = id match {
case Inl(userId) => userId + 1
case Inr(Inl(deviceUuid)) => deviceUuid + 2
case Inr(Inr(Inl(carSerialId))) => carSerialId + 3
case Inr(Inr(Inr(cNil))) => 0
}
getId(Coproduct[Id](taggedUserId))
getId(Coproduct[Id](taggedDeviceUuid))
getId(Coproduct[Id](taggedCarSerialId))
view raw TaggedType.sc hosted with ❤ by GitHub

Play with Lagom (0) – What is (Not Completed)

As you can see obove, it’s not completed. As I study Lagom, I can say more about these topic. And fill it more!

These days, Internet Technology is growing up and there occur many buzzwords around. ‘Reactive’, ‘Micro Service’, ‘Event Sourcing’, ‘CQRS’… Many so called ‘Progressive Company’ cry out these buzzwords. And I also want to know about this. After moving to Kakao, I usually use Play, Akka, Finatra.(Personally, Finatra is a very good framework to make a productive micro server.) And looking forward to finding a very good solution to manage micro services in our service. And these series is a very short journey about Lagom.(Because I just want to taste Lagom. Of course, if I decide that it is the real solution to our service, it will be long journey 🙂 )

What is Reactive?

One of the hard waves around here is ‘Reactive’. You can read and sign to ‘Reactive Manifesto’ in here. Ok, after sign to Manifesto, Let’s summary what is Reactive System.

  • Reactive is Responsive. Service has to be responsive at all time.
  • Reactive is Resilience. Service has to be responsive at to time of failure occur.
    • It means that, if an error occurs, the error is isolated and not affect to other component or service.
  • Reactive is Elastic. Service has to be scaled elastically when system load.
    • It means that, if the service loaded, for example, Akka system assign more thread to the loaded actor.
  • Reactive is Message Driven. Reactive call remote system by targeting message.

Reactive Programming vs Reactive System.

This topic is not related to the main subject. But it is very important to distinguish between Reactive Programming and Reactive System. It is referenced from this article. It’s just summary about this article.

The reactive system is the way to build service. Above description about reactive is about Reactive system. Reactive system passes messages to the target. By message passing and mail box, we can assign more resources to some component(or micro server). We can scale horizontally or vertically.

The Reactive Programming is the way to handle events. We catch events from event stream and handle by chaining functions. We catch event compare to Reactive System which operated by message passing. Reactive programming is usually async,(In fact, have to be async) we can assign to another job which needs computing resource.

Reactive Programming is fit to Reactive System. Words are very awkward, But it means very natural. Reactive Programming is tightly coupled between Event and computing by chaining. And this makes less resilience. And Reactive system is key to resolve this problem.

What is CQRS?

CQRS is very simple. In service, we separate Command (Write) and Query. (Read) Martin Fowler describes it very well in this article.

it’s like read replica in SQL, Application request writes and read in different API or server.

cqrs
Martin Fowler – CQRS

As you can see, Service Interface(API Gateway) request read to Query Model, and write to Command Model.

What is Event Sourcing?

Event Sourcing is making event as a sequence of data. Let’s think like this. We make some data ‘a’ to 3 by adding 1 and 2. In common architecture, We only store that a is 3. But in Event Sourcing, we store that

  1. ‘add 1’ event occur,
  2. ‘add 2’ event occur.

So, we can get that a is 3. It means that we can get system service when a is 1 or a is 0. This can restore the service any time, any status.

Scala Implicitly

As I study about Scala, I open see implicitly. But, every time I see this, I just ignore just thinking like “It’s syntactic sugar of implicit”. This time, I write this post to really under stand this.

In Scala, implicitly is just this.


def implicitly[T](implicit e: T) = e

That’s all! implicitly is a function just get implicit e instance and return e. Then, what is an advantage of using implicitly?

Example 1

You can use implicit like this.

implicit val optionInt: Option[Int] = Some(1)
implicit val optionBoolean: Option[Boolean] = Some(true)

def getImplicitInt(implicit oInt: Option[Int]): Int = oInt.get
def getImplicitBoolean(implicit oBoolean: Option[Boolean]): Boolean = oBoolean.get

getImplicitInt
// res0: Int = 1
getImplicitBoolean
// res1: Boolean = true

Now, you can use just same by using implicitly


implicit val optionInt: Option[Int] = Some(1)
implicit val optionBoolean: Option[Boolean] = Some(true)

def getImplicitlyA[A: Option]: A = implicitly[Option[A]].get

getImplicitlyA[Int]
// res2: Int = 1
getImplicitlyA[Boolean]
// res3: Boolean = true

Implicitly can use in such situation.

Implicitly can use when you want to get implicit instance which have 1 type parameter.

As you can see above, by using ‘Implicitly[Option[Int]]’, you can get implicit instance ‘Option[Int]’. It has 1 type parameter Int.

Example 2

Then how about implicitly for the type which has no type parameter? For example, implicit can do like this.


implicit val string: String = "Hello"
implicit val boolean: Boolean = true
implicit val int: Int = 0

def getImplicitT[T](implicit t: T): T = t

getImplicitT[Int]
// res0: Int = 0

getImplicitT[String]
// res1: String = Hello

getImplicitT[Boolean]
// res2: Boolean = true

But, if you do the same with implicitly, errors occur.


def getImplicitlyT[T]: T = implicitly[T]

getImplicitlyT[Int]
getImplicitlyT[String]
getImplicitlyT[Boolean]

// compile error: not enough arguments for method implicitly: (implicit e: T)T. Unspecified value parameter e. def getImplicitlyT[T]: T = implicitly[T] ^

Yes, because I mentioned above, implicitly can be used the type which has a type parameter. Then how can we use implicitly in this situation? The answer is ‘make Id type’.


type Id[A] = A

def getImplicitlyT[T : Id]: T = implicitly[Id[T]]

getImplicitlyT[Int]
// res3: Int = 0

getImplicitlyT[String]
// res4: String = Hello

getImplicitlyT[Boolean]
// res5: Boolean = true

I make new type Id which has one type parameter. And Id[A] is same as A. So, you can use just like implicit

Context bound

Right side of function ‘getImplicitlyAt’ has type parameter(A : Option). It looks like type bound. But it is called Context bound. Type bound is ‘<:’.You can see type bound in Scala School. In context bound, it doesn’t mean that A is Option. Instead, A will be a type passing to Option.

implicit val optionInt: Option[Int] = Some(1)
implicit val optionBoolean: Option[Boolean] = Some(true)
def getImplicitOptionInt(implicit oInt: Option[Int]): Int = oInt.get
def getImplicitOptionBoolean(implicit oBoolean: Option[Boolean]): Boolean = oBoolean.get
getImplicitOptionInt
getImplicitOptionBoolean
def getImplicitlyOptionA[A: Option]: A = implicitly[Option[A]].get
getImplicitlyOptionA[Int]
getImplicitlyOptionA[Boolean]
trait Parent {
def sayHello: String
}
trait Son extends Parent {
override def sayHello: String = "Hello! I'm Son!"
}
trait Daughter extends Parent {
override def sayHello: String = "Hello! I'm Daughter!"
}
def sayHello[A <: Parent](a: A): Unit = println(a.sayHello)
sayHello(new Son {})
sayHello(new Daughter {})
implicit val string: String = "Hello"
implicit val boolean: Boolean = true
implicit val int: Int = 0
def getImplicitT[T](implicit t: T): T = t
getImplicitT[Int]
getImplicitT[String]
getImplicitT[Boolean]
type Id[A] = A
def getImplicitlyT[T : Id]: T = implicitly[Id[T]]
getImplicitlyT[Int]
getImplicitlyT[String]
getImplicitlyT[Boolean]

Monitoring Micro Service With Zipkin

This post is recommended to read with an example. You can clone example in my Github – https://github.com/ktz-boilerplate/finatra-zipkin-example.

 

Micro Service is becoming trendy technology. But there are many things to consider during migrate our service to micro service. One of them is latency. We have to monitor each micro service and find out which service is a bottleneck and why does that happen. And Zipkin can be a reasonable answer.

Zipkin

Zipkin is monitoring solution about latency. It traces each micro service and calculates latencies so that we can trace which micro take lots of time and handle it. This solution is base on Google Dapper.

Example

Start Zipkin on Docker

First, we will test Zipkin solution on Docker. So, if you don’t have Docker in local, please install it. You can see how to install in Docker Documentation.

Now, let’s start Zipkin on Docker.

$ docker run -d -p 9411:9411 -p 9410:9410 -e SCRIBE_ENABLED=true openzipkin/zipkin

9411 port is Zipkin web UI and 9410 port is scribe port. And you have to enable scribe port to accept Thrift by SCRIBE_ENABLED=true. If you up Zipkin on Docker, then you can see web UI in here.

Zipkin Twitter Flag

Before start servers, Let’s look around for twitter flags related to Zipkin. If your server is not compatible to this flags, you have to version up Finatra.

-http.name: HTTP Server name which displayed in Zipkin.

-thrift.name: Thrift Server name which displayed in Zipkin.

-com.twitter.finagle.zipkin.host: Zipkin host and port number to pass trace span(data).

-com.twitter.finagle.zipkin.initialSampleRate: Sample rate. from 0 to 1.0. 1.0 is that trace all request and send to Zipkin.

Start Finatra HTTP Server

Now, go to project directory. You can see 5 sub-projects. One of them is Finatra HTTP-Server. Let’s start HTTP Server first.

$ sbt 'project http-server' "run -http.name=http-server -com.twitter.finagle.zipkin.host=localhost:9410 -com.twitter.finagle.zipkin.initialSampleRate=1"

Start Finatra Thrift Server

Now, let’s start another server. It’s Thrift Server

$ sbt 'project thrift-server' "-thrift.name=thrift-server -com.twitter.finagle.zipkin.host=localhost:9410 -com.twitter.finagle.zipkin.initialSampleRate=1"

Query Something.

Now connect to HTTP-Server and Query Something. There are some URL you can test.

get

  • /users
  • /user/:userId
  • /user/car/
  • /user/:userId/car

post

  • /user

For example, if you want to get all users. You can get by call http://localhost:8080/users

After some queries, you can see tracing status in Zipkin Web UI like this.

zipkinui1.png

The default setting is sort by longest latency first.

 

You can also see dependency like this.

dependency.png

Applicative in Scala Cats

I always heard about Applicative, but I don’t know what really is. So, I write this post to understand myself. Below picture is from adit.io and example is from mastering advanced Scala.

What is Applicative?

During programming, you will encounter this situation.

def getSome(a: Int): Option[Int] = Some(a)

def getNone(a: Int): Option[Int] = None

def add(a: Int, b: Int): Int = a + b

val aOpt = getSome(1)

val bOpt = getSome(2)

And I want to get aOpt and bOpt add together using add. But aOpt and bOpt is Option. So, the usual way to do is by using flatMap and map like this.


aOpt.flatMap(a => bOpt.map(b => add(a, b)))
// res0: Option[Int] = Some(3)

In here, we can use Applicative.

What is Applicative?

I don’t know that my understanding of Applicative is right. But I think, Applicative is

applicative_just.png

Very famous picture about Applicative.

In here there is a wrapped value: 2. And there is a wrapped function. We unwrapped both function and value, and process some operation and wrap it again! And we can also adapt these things to above example.


import cats.instances.option._
import cats.Applicative

Applicative[Option].map2(aOpt, bOpt)(add)
// res1: Option[Int] = Some(3)

We can also use like this.


import cats.syntax.all._

(aOpt |@| bOpt).map(add)

Monad extends Applicative


@typeclass trait Monad[F[_]] extends FlatMap[F] with Applicative[F] {
  override def map[A, B](fa: F[A])(f: A => B): F[B] =
  flatMap(fa)(a => pure(f(a)))
}

As you can see, Monad extends Applicative. And Applicative extends Functor. So, hierarchy is ‘Monad <: Applicative <: Functor’

Tip – Traverse

While studying with Mastering Advanced Scala, there are some tips in Applicative, So, I write down this. If there is a List[Int] and want all element adapt ‘getSome’ function, for example


val ints = List(1,2,3,4,5)
ints.map(getSome)
// res3: List[Option[Int]] = List(Some(1), Some(2), Some(3), Some(4), Some(5))

The result is ‘List(Some(1), Some(2), Some(3), Some(4), Some(5))’ and if you want to make ‘Some(List(1, 2, 3, 4, 5))’ you can use Traverse like this.


import cats.Traverse
import cats.instances.list._
import cats.instances.option._

Traverse[List].traverse(ints)(getSome)

// res4: Option[List[Int]] = Some(List(1, 2, 3, 4, 5))

This will return None if any element return None


def getSomeOrNone(a: Int): Option[Int] =
  if(a % 2 == 0) Some(a)
  else None

Traverse[List].traverse(ints)(getSomeOrNone)

// res5: Option[List[Int]] = None

 

def getSome(a: Int): Option[Int] = Some(a)
def getNone(a: Int): Option[Int] = None
def add(a: Int, b: Int): Int = a + b
val aOpt = getSome(1)
val bOpt = getSome(2)
aOpt.flatMap(a => bOpt.map(b => add(a, b)))
import cats.instances.option._
import cats.Applicative
Applicative[Option].map2(aOpt, bOpt)(add)
import cats.syntax.all._
(aOpt |@| bOpt).map(add)
val ints = List(1,2,3,4,5)
ints.map(getSome)
import cats.Traverse
import cats.instances.list._
import cats.instances.option._
Traverse[List].traverse(ints)(getSome)
def getSomeOrNone(a: Int): Option[Int] =
if(a % 2 == 0) Some(a)
else None
Traverse[List].traverse(ints)(getSomeOrNone)

Cartesian Product in Scala Cats

During work, I have some problem handling three future at the same time. First future return a long value and second and third future receive the parameter from the first return and return a tuple.

The code is like this.


for {
  long <- futureLong()
  int <- futureInt(long)
  char <- futureChar(long)
} yield (int, char)

Of course for comprehension operates step by step, so, futureInt is called after futureLong is finished and futureChar is called after futureInt is finished. So, It takes too much time to get a final return value.

So, I changed codes like this.


val longVal: Future[Long] = futureLong()

val intVal: Future[Int] = for {
  long <- futureLong()
  int <- futureInt(long)
} yield int

val charVal: Future[Char] = for {
  long <- futureLong()
  char <- futureChar(long)
} yield char

val forComp: (Int, Char) = Await.result(for {
  int <- intVal
  char <- charVal
} yield (int, char), 5 second)

Yes, It works as I want. But, there is duplicated codes like, ‘long <- futureLong()’. During thinking a long time, my coworker, Liam, recommends using  Cartesian Product.

What is Cartesian Product?

Cartesian Product is very simple. It’s just a product of two Sets. For example, if there are two sets A = {1,2,3} and B = {‘a’, ‘b’, ‘c’}, Cartesian Product of A X B is

{(1, ‘a’), (1, ‘b’), (1, ‘c’), (2, ‘a’), (2, ‘b’), (2, ‘c’), (3, ‘a’), (3, ‘b’), (3, ‘c’)}

Yes. that’s all. Of course Cartesian Product didn’t hold commutative law. For example Cartesian Product of B X A is

{(‘a’, 1), (‘a’, 2), (‘a’, 3), (‘b’, 1), (‘b’, 2), (‘b’, 3), (‘c’, 1), (‘c’, 2), (‘c’, 3)}

And different from A X B.

Cartesian Product in cats

Above example can be implemented in Scala.


import cats.instances.list._
import cats.syntax.all._

val ints: List[Int] = List(1, 2, 3, 4)
val chars: List[Char] = List('a', 'b', 'c')

(ints |@| chars).tupled
// res0: List[(Int, Char)] = List((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c), (4,a), (4,b), (4,c))
(chars |@| ints).tupled
// res1: List[(Char, Int)] = List((a,1), (a,2), (a,3), (a,4), (b,1), (b,2), (b,3), (b,4), (c,1), (c,2), (c,3), (c,4))

(ints |@| chars).map(_ + _.toString)
// res2: List[String] = List(1a, 1b, 1c, 2a, 2b, 2c, 3a, 3b, 3c, 4a, 4b, 4c)

In Cats Cartesian Product Syntax is ‘|@|’.(Some people call it Oink, and others call Pizza box) If you call tuple, it will return List Tuple. You can also map it.

Cartesian Product in Future

Ok, let’s return to the previous problem. I need to call futureInt and futureChar at a same time. And using three for comprehension is too dirty. So, how and I solve this with Cartesian Product? It’s very simple.


for {
  long <- futureLong()
  (int, char) <- (futureInt(long) |@| futureChar(long)).tupled
} yield (int, char)

Very simple right? You can test whether this expression really operates at the same time.

package org.example.ktz.blog
import cats.syntax.all._
import cats.instances.future._
import org.scalatest._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class CartesianProductTest extends FlatSpec with Matchers {
def futureLong(): Future[Long] = Future {
Thread.sleep(2000)
println("Future Long Done!")
2
}
def futureInt(long: Long): Future[Int] = Future{
Thread.sleep(2000)
println("Future Int Done!")
long.toInt
}
def futureChar(long: Long): Future[Char] = Future{
Thread.sleep(2000)
println("Future Char Done!")
long.toChar
}
"Concurrent" should "for comprehension" in {
val forComp: (Int, Char) = Await.result(for {
long <- futureLong()
int <- futureInt(long)
char <- futureChar(long)
} yield (int, char), 5 second)
// java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
}
"Concurrent" should "for comprehension2" in {
val longVal: Future[Long] = futureLong()
val intVal: Future[Int] = for {
long <- futureLong()
int <- futureInt(long)
} yield int
val charVal: Future[Char] = for {
long <- futureLong()
char <- futureChar(long)
} yield char
val forComp: (Int, Char) = Await.result(for {
int <- intVal
char <- charVal
} yield (int, char), 5 second)
// Work well 🙂
}
"Concurrent" should "cartesian" in {
val cartesion: (Int, Char) = Await.result(for {
long <- futureLong()
(int, char) <- (futureInt(long) |@| futureChar(long)).tupled
} yield (int, char), 5 second)
// Work well 🙂
}
}

In this test, the first test occurs TimeoutException and second and third pass test well. Then, we can make a conclusion that, last expression also run at the same time.

Writer Monad in cats

I already wrote a post about reader monad and It’s time to write about Writer Monad. First, I thought that Writer Monad is opposite of Reader Monad. But it’s quite different from Reader Monad. Let’s say that Writer Monad is…

Writer Monad is stack something while operating.

Hm… I think it’s not a very well summarized sentence. If any reader has nicer summarized one, comment me. Below example is from mastering advanced Scala.

Example

type Writer id defined in cats like this.

type Writer[L, V] = WriterT[Id, L, V]

As you can see, Writer Monad accept type L & V. type L is a type which stacks something. and V is a result(return) value. Id position in WirterT is something wraps the result. Here is Id so, never mind.

You can simply use Writer Monad like.

import cats.data.Writer

def greetW(name: String, logged: Boolean): Writer[List[String], String] =
  Writer(List("Compose a greeting"), {
    val userName = if(logged) name else "User"
    s"Hello $userName"
  })

def isLoggedW(name: String): Writer[List[String], Boolean] =
  Writer(List("Checking if user is logged in"), name.length == 3)

import cats.instances.list._

val name = "Joe"

val resultW: Writer[List[String], String] = for {
  logged <- isLoggedW(name)
  greeting <- greetW(name, logged)
} yield greeting
val (log, result) = resultW.run

// log: List[String] = List(Checking if user is logged in, Compose a greeting)
//result: String = Hello Joe

Yes. It’s very simple. It just stacks some strings in L(List[String]) and returns value in V(String). I think it’s very useful.

How is it works?

Of course, you can have curious that how’s is it works? As you can see in ‘for comprehension’, there is nowhere to pass List[String]. But after finishing for comprehension, it splits out log: List[String] and result: String.

Let’s remind this.

For comprehension is syntactic sugar of flatMap.

You can also write like this.

val flatMapResult: Writer[List[String], String] =
  isLoggedW(name).flatMap(logged => greetW(name, logged))

It’s the same result of for comprehension statements.

Ok. It’s flatMap. Then, Let’s deep dive to WriterT.flatMap

final case class WriterT[F[_], L, V](run: F[(L, V)]) {
        ...
  def flatMap[U](f: V => WriterT[F, L, U])(implicit flatMapF: FlatMap[F], semigroupL: Semigroup[L]): WriterT[F, L, U] =
    WriterT {
      flatMapF.flatMap(run) { lv =>
        flatMapF.map(f(lv._2).run) { lv2 =>
          (semigroupL.combine(lv._1, lv2._1), lv2._2)
        }
      }
    }
        ...
}

In this code, never mind another but line:7. In Line 7, they combine two List[String]. Stacking is occurred in here. WriterT is different from others. value run’s type is F[(L, V)] and it does flatMap with it. So we can stack List[String]

WriterT Example

WriterT is Monad Transformer of Writer. So, you can use like this.

import cats.data.WriterT
import cats.instances.future._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def greetWFuture(name: String, logged: Boolean): WriterT[Future, List[String], String] =
  WriterT(Future(List("Compose a greeting"), {
    val userName = if(logged) name else "User"
    s"Hello $userName"
  }))

def isLoggedWFuture(name: String): WriterT[Future, List[String], Boolean] =
  WriterT(Future(List("Checking if user is logged in"), name.length == 3))

val resultWFuture: WriterT[Future, List[String], String] = for {
  logged <- isLoggedWFuture(name)
  greeting <- greetWFuture(name, logged)
} yield greeting

val futureResult: Future[(List[String], String)] = resultWFuture.run

import scala.concurrent.Await
import scala.concurrent.duration._

val (logAwait, resultAwait) = Await.result(futureResult, 2 second)

It’s very simple. We can use Writer Monad inside another Monad.

import cats.data.Writer
def greetW(name: String, logged: Boolean): Writer[List[String], String] =
Writer(List("Compose a greeting"), {
val userName = if(logged) name else "User"
s"Hello $userName"
})
def isLoggedW(name: String): Writer[List[String], Boolean] =
Writer(List("Checking if user is logged in"), name.length == 3)
import cats.instances.list._
val name = "Joe"
val resultW: Writer[List[String], String] = for {
logged <- isLoggedW(name)
greeting <- greetW(name, logged)
} yield greeting
val (log, result) = resultW.run
val flatMapResult: Writer[List[String], String] = isLoggedW(name).flatMap(logged => greetW(name, logged))
import cats.data.WriterT
import cats.instances.future._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def greetWFuture(name: String, logged: Boolean): WriterT[Future, List[String], String] =
WriterT(Future(List("Compose a greeting"), {
val userName = if(logged) name else "User"
s"Hello $userName"
}))
def isLoggedWFuture(name: String): WriterT[Future, List[String], Boolean] =
WriterT(Future(List("Checking if user is logged in"), name.length == 3))
val resultWFuture: WriterT[Future, List[String], String] = for {
logged <- isLoggedWFuture(name)
greeting <- greetWFuture(name, logged)
} yield greeting
val futureResult: Future[(List[String], String)] = resultWFuture.run
import scala.concurrent.Await
import scala.concurrent.duration._
val (logAwait, resultAwait) = Await.result(futureResult, 2 second)