# Process Module
Processor is build on top of Akka actor model to help build a message based system. To know more go through akka actor system: https://doc.akka.io/docs/akka/2.5/guide/actors-intro.html
# Creating a processor module in smile
- Open
Mould.ksmilefile and define the processor module
process-module LearningProcess at com.metastay.learningprocess
- Run smile command in sbt
- refresh the project in eclipse.
You will see the
LearningProcess.kprocessfile generated
# Types of triggers for a Processor
# Message
- Open
LearningProcess.kprocessfile in eclipse and define a messages
message SayHello(greeting:String)
- Now define a processor which accepts the above message
processor Actor {
accept message-ref SayHello
}
- Define the reaction of the processor on receiving this message in
ActorProcessorCode.scala
class ActorProcessorCode extends ActorProcessor.Consumers {
override def handleException(message: Message, t: Throwable, retryCount: Int, sender: Option[ActorRef]): ProcessorEHStrategy = {
println(s"Exception in Actor processor on message: $message error: ${t.stackTraceString(50)}")
ProcessorEHStrategy.Stop
}
override def process(message: com.metastay.learningprocess.message.SayHello, sender: ActorRef): Unit = {
println(s"Actor: Hello, ${message.greeting} ${sender.path.name}!")
}
}
- Let’s write a simple test to see the message being responded
class ActorProcessorTestSuite extends FunSuite with SmilePerTest {
test("simple processor", Tag("hello")) {
grab[com.metastay.learningprocess.processor.actor.ActorProcessorFixture]
.send(message = SayHello(greeting = "Good morning"))
.run()
.printMessageFlowList
}
}
- To drop a message to the processor
grab[ActorProcessor].send(message = SayHello(greeting = "Hello there!"))
# Timer
- Define a timer, edit
LearningProcess.kprocessfile in eclipse
publishable message DrinkWater
timer [DrinkWater] EveryHour in Asia/Calcutta {
from 7:00 to 19:00 every 1 hour
}
- Subscribe to the timer message
processor Actor {
accept message-ref SayHello
subscribe message-ref DrinkWater
}
- compile
- Define the reaction to the timer in
ActorProcessorCode.scala
override def process(message: DrinkWater.type): Unit = {
println(s"Hey! Time for some water!")
}
- You can see the message printed once the processor starts.
- Write a small test to start & stop the processor & the timer system, have a wait in between to see the messages getting printed
class ActorProcessorTestSuite extends FunSuite with SmilePerTest {
test("run a processor", Tag("timer")) {
val processor = grab[ActorProcessor]
val timerSystem = new TimerSystem("LearningProcess")
processor.start()
timerSystem.start()
waitForEnterKeyStroke()
timerSystem.stop()
processor.stop()
}
}
- Test:compile & run the test, since the interval is long you could alter it for the test
- Edit the timer
timer [DrinkWater] EveryHour in Asia/Calcutta {
from 7:00 to 19:00 every 10 second //1 hour
}
- Run the test & revert the timer code.
NOTE
every processor defined gets started by default when the server starts.
# Event
- To subscribe to an event, add the stream module’s dependency to processor module. Edit
Mould.ksmile
process-module LearningProcess(LearningStream) at com.metastay.learningprocess
- Run smile command
- Now create a processor & subscribe to an event. Edit
LearningProcess.kprocessfile
processor Enroller {
subscribe event-ref LearningStream::EmployeeRegistered
}
- Compile
- Lets keep it simple and just print a message as a response for now:
class EnrollerProcessorCode extends EnrollerProcessor.Consumers {
override def handleException(message: Message, t: Throwable, retryCount: Int, sender: Option[ActorRef]): ProcessorEHStrategy =
ProcessorEHStrategy.Ignore
override def process(event: com.metastay.leaningstream.stream.EmployeeRegistered): Unit = {
println(s"Welcome aboard ${event.name}!")
}
}
- Compile
- Run server & register an employee, the message should appear on the console.
- Alternately write a simple test to see how this works
# Reactor
This is a specific type of a processor who only reacts/subscribes to events of a specific stream. Its is a durable-message processor, ie keeps a counter (snapshot) of the event numbers it has processed until.
- Define a reactor in LearningProcess.kprocess
reactor[LearningStream] CouponDispatcher {
EmployeeRegistered
}
- Compile
- Testing a processor
- Assignment