Thursday, 19 April 2012

Akka-Camel 2.1 consumer/producer example.

import akka.camel.{Consumer, CamelMessage}
import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import akka.util.duration._
import akka.pattern.ask
import scaladays2012.{Email, EmailerConfig, Emailer}
class HttpConsumerExample extends Consumer{
def endpointUri = "jetty://http://0.0.0.0:1111/demo"
implicit val timeout = Timeout(30 seconds)
val emailer = context.actorOf(Props(new Emailer(EmailerConfig(System.getProperty("gmail.password")))), "Emailer")
protected def receive = {
case msg : CamelMessage => {
val name = msg.header("name").getOrElse("Stranger")
val message = "Hello %s from ScalaDays 2012!" format name
sender ! message
for( email <- msg header "email" ) {
emailer ? Email("piotrga@gmail.com", email.toString, message, "hello") // ignoring the response
}
}
}
}
import akka.actor.Actor
import akka.camel.{Failure, CamelMessage, Producer}
case class EmailerConfig(gmailPassword: Required[String])
case class Email(from: String, to: String, subject: String, body: String)
class Emailer(cfg: EmailerConfig) extends Actor with Producer{
def endpointUri = "smtps://smtp.gmail.com:465?username=peter@scala-experts.com&password=%s&debugMode=false&defaultEncoding=utf-8" format cfg.gmailPassword.value
override protected def transformOutgoingMessage(msg: Any) = msg match {
case Email(from, to, subject, body) =>
new CamelMessage(body, Map("from" -> from, "to" -> to, "subject"-> subject))
}
override protected def transformResponse(msg : Any) = msg match {
case resp: Failure => akka.actor.Status.Failure(resp.getCause)
case _ => msg
}
}
object HttpConsumerApp extends App {
val sys = ActorSystem("test")
sys.actorOf(Props[HttpConsumerExample], "HttpConsumer")
sys.awaitTermination()
}
view raw gistfile1.scala hosted with ❤ by GitHub

Tuesday, 6 March 2012

Atomic update of AtomicReference

Problem

I like the AtomicReference class. When used with a immutable class, it offers quite a nice and efficient abstraction for concurrent data structure. One thing it is missing, in my opinion, is the atomic update method, ie.
  val list = new AtomicReference(List(1,2,3))
  list.update(list => list.map(_ *2))

Solution

You just need to import the Atomic class, and it will pimp the AtomicReference, so you can call the update method. See below:
import annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
object Atomic {
def apply[T]( obj : T) = new Atomic(new AtomicReference(obj))
implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref)
}
class Atomic[T](val atomic : AtomicReference[T]) {
@tailrec
final def update(f: T => T) : T = {
val oldValue = atomic.get()
val newValue = f(oldValue)
if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f)
}
def get() = atomic.get()
}
view raw gistfile1.scala hosted with ❤ by GitHub
Enjoy!

Monday, 30 January 2012

Managing of opening and closing multiple resources automatically in Scala

Problem

Managing opening and closing multiple resource automatically. For example when you want to copy a file you need to open and close both input and output files.

Solution

I recently came across scala-arm library written by Josh Suereth. I liked the idea so much that I wanted to experiment with writing something similar, but a bit simpler myself.

Here is what I came up with:

import io.Source
import java.io.FileWriter
object ManagedDemo extends App{
import Managed._
import Managed.{managed => ®}
for (
fw <- ®(new FileWriter("target/test-out.txt"));
fr <- ®(Source.fromFile("build.sbt"))
){
fw.write(fr.mkString)
}
println("written:\n"+ ®(Source.fromFile("target/test-out.txt")).map(_.mkString))
}
trait Managed{
def close() : Unit
}
object Managed{
implicit def close2Managed[A <: { def close() : Any }](c:A) : Managed = Managed( c.close() )
implicit def dispose2Managed[A <: { def dispose() : Any }](c:A) : Managed = Managed( c.dispose() )
private def apply(closeResource : => Unit) : Managed = new Managed {
def close() : Unit = closeResource
}
def managed[A <% Managed](res: => A) = new Traversable[A]{
def foreach[U](f: (A) => U) {
val closable : A = res
try{
f(closable)
}finally{
closable.close()
}
}
}
}
view raw gistfile1.scala hosted with ❤ by GitHub

Explanation

The magic is in the foreach method and the way the for and map work. They both use the foreach method. We are using this fact and wrapping the iteration with try-finally block, so we can call the close or dispose method when the traversal is finished.

The rest of the magic are just implicit conversions between anything which contains def close() : Any or def dispose() : Any and Managed trait.

Enjoy!

Sunday, 8 January 2012

Stop your services gracefully in Scala with tryAll

object ErrorUtils{
/**
* Executes a block and returns the result wrapped by Right class, or exception wrapped by Left class.
*/
def either[T](block:() => T) : Either[Throwable,T] = try {Right(block())} catch {case e => Left(e)}
/**
* Executes all blocks in order and collects exceptions. It guarantees to execute all blocks, even if some of them fail.
* It throws a BlockException, if any number of blocks fail. BlockException contains a list of thrown exceptions.
*
* <br/><br/>Example:
<pre>
tryAll(
service1.stop,
service2.shutdown,
service3.kill
)
</pre>
*
* @return nothing
* @throws BlockException if any number of blocks fail. BlockException contains a list of thrown exceptions.
*
*/
def tryAll(block1 : => Unit, block2 : => Unit = {}, block3 : => Unit= {}, block4 : => Unit = {}, block5 : => Unit = {}, block6 : => Unit = {}) = {
val blocks = List(()=>block1, ()=>block2, ()=>block3, ()=>block4, ()=>block5, ()=>block6)
val errors = blocks.toList.map(either(_)).filter(_.isLeft).map{case Left(e) => e}
if (!errors.isEmpty) throw new BlockException(errors)
}
case class BlockException(errors : List[Throwable]) extends RuntimeException("There were errors while executing blocks: "+errors)
}
view raw gistfile1.scala hosted with ❤ by GitHub

Sunday, 1 January 2012

Mocking in Scala with ScalaTest and Mockito

It turnes out that mocking with Mockito in Scala is very simple. The only issue is the eq method which I had to map to the.
import org.mockito.Matchers.{eq => the, any}

Dependencies

To use ScalaTest with Mockito you need the following dependencies:
  "org.scalatest" %% "scalatest" % "1.6.1" % "test",
  "org.mockito" % "mockito-core" % "1.9.0" % "test",
Have a look at the example:
package akka.camel
import akka.actor._
import org.scalatest.FlatSpec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.mock.MockitoSugar
import org.mockito.Mockito._
import org.mockito.Matchers.{eq => the, any}
class ConsumerScalaTest extends FlatSpec with ShouldMatchers with MockitoSugar{
val system = ActorSystem("test")
"Consumer" should "register itself with Camel during initialization" in{
val mockCamel = mock[ConsumerRegistry]
class TestActor extends Actor with Consumer {
override val camel = mockCamel
from("file://abc")
protected def receive = { case _ => println("foooo..")}
}
system.actorOf(Props(new TestActor()))
verify(mockCamel).registerConsumer(the("file://abc"), any[TestActor])
}
}
view raw gistfile1.scala hosted with ❤ by GitHub