# Process Module

Processor is build on top of Akka actor model to help build a message based system. To know more go through akka actor system: https://doc.akka.io/docs/akka/2.5/guide/actors-intro.html

# Creating a processor module in smile

  • Open Mould.ksmile file and define the processor module
process-module LearningProcess at com.metastay.learningprocess
  • Run smile command in sbt
  • refresh the project in eclipse. You will see the LearningProcess.kprocess file generated

# Types of triggers for a Processor

# Message

  • Open LearningProcess.kprocess file in eclipse and define a messages
message SayHello(greeting:String)
  • Now define a processor which accepts the above message
processor Actor {
    accept message-ref SayHello 
}
  • Define the reaction of the processor on receiving this message in ActorProcessorCode.scala
class ActorProcessorCode extends ActorProcessor.Consumers {

 override def handleException(message: Message, t: Throwable, retryCount: Int, sender: Option[ActorRef]): ProcessorEHStrategy = {
   println(s"Exception in Actor processor on message: $message error: ${t.stackTraceString(50)}")
   ProcessorEHStrategy.Stop
 }

 override def process(message: com.metastay.learningprocess.message.SayHello, sender: ActorRef): Unit = {
   println(s"Actor: Hello, ${message.greeting} ${sender.path.name}!")
 }
}
  • Let’s write a simple test to see the message being responded
class ActorProcessorTestSuite extends FunSuite with SmilePerTest {

 test("simple processor", Tag("hello")) {
    grab[com.metastay.learningprocess.processor.actor.ActorProcessorFixture]
     .send(message = SayHello(greeting = "Good morning"))
     .run()
     .printMessageFlowList
 }
}
  • To drop a message to the processor
grab[ActorProcessor].send(message = SayHello(greeting = "Hello there!"))

# Timer

  • Define a timer, edit LearningProcess.kprocess file in eclipse
publishable message DrinkWater
timer [DrinkWater] EveryHour in Asia/Calcutta {
    from 7:00 to 19:00 every 1 hour
}
  • Subscribe to the timer message
processor Actor {
    accept message-ref SayHello 
    subscribe message-ref DrinkWater
}
  • compile
  • Define the reaction to the timer in ActorProcessorCode.scala
override def process(message: DrinkWater.type): Unit = {
    println(s"Hey! Time for some water!")
}
  • You can see the message printed once the processor starts.
  • Write a small test to start & stop the processor & the timer system, have a wait in between to see the messages getting printed
class ActorProcessorTestSuite extends FunSuite with SmilePerTest {

    test("run a processor", Tag("timer")) {
       val processor = grab[ActorProcessor]
       val timerSystem = new TimerSystem("LearningProcess")
       processor.start()
       timerSystem.start()
       waitForEnterKeyStroke()
       timerSystem.stop()
       processor.stop()
    }
}
  • Test:compile & run the test, since the interval is long you could alter it for the test
  • Edit the timer
timer [DrinkWater] EveryHour in Asia/Calcutta {
    from 7:00 to 19:00 every 10 second //1 hour
}
  • Run the test & revert the timer code.

NOTE

every processor defined gets started by default when the server starts.

# Event

  • To subscribe to an event, add the stream module’s dependency to processor module. Edit Mould.ksmile
process-module LearningProcess(LearningStream) at com.metastay.learningprocess
  • Run smile command
  • Now create a processor & subscribe to an event. Edit LearningProcess.kprocess file
processor Enroller {
    subscribe event-ref LearningStream::EmployeeRegistered
}
  • Compile
  • Lets keep it simple and just print a message as a response for now:
class EnrollerProcessorCode extends EnrollerProcessor.Consumers {

   override def handleException(message: Message, t: Throwable, retryCount: Int, sender: Option[ActorRef]): ProcessorEHStrategy =
     ProcessorEHStrategy.Ignore

   override def process(event: com.metastay.leaningstream.stream.EmployeeRegistered): Unit = {
     println(s"Welcome aboard ${event.name}!")
   }

}
  • Compile
  • Run server & register an employee, the message should appear on the console.
  • Alternately write a simple test to see how this works

# Reactor

This is a specific type of a processor who only reacts/subscribes to events of a specific stream. Its is a durable-message processor, ie keeps a counter (snapshot) of the event numbers it has processed until.

  • Define a reactor in LearningProcess.kprocess
reactor[LearningStream] CouponDispatcher {
    EmployeeRegistered
}
  • Compile
  • Testing a processor
  • Assignment