akka是什么? akka是一个基于actor并发模型的框架,该模型可以追溯到1973年发表的论文《A Universal Modular Actor Formalism for Artificial Intelligence 》, Actor就源于该模型。
actor通过消息传递的方式与外界通信。消息传递是异步的。每个actor都有一个邮箱,该邮箱接收并缓存其他actor发过来的消息,actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
<!— more —>
Akka的哲学是基于End to End Argument这个事实的,即:在分布式环境中,reliable的通信需要在application层来确保,中间件层或者infrastructure只能提供效率上的帮助,而绝对无法保证reliability。
akka是一个基于actor模型的框架,提供了极致的异步并发处理性能,但是分布式条件下的可靠性需要使用者来保障
akka模块
以下列出了akka核心模块,所有akka的核心功能模块均遵循开源软件协议(Open Source Software (OSS)), Lightbend同时也提供商业版本,提供基于akka的增强功能。
-
akka library
- How to build and design high-performance, concurrent applications.
- How to handle errors in a multi-threaded environment.
- How to protect my project from the pitfalls of concurrency.
-
remoting
- How to address actor systems living on remote hosts.
- How to address individual actors on remote actor systems.
- How to turn messages to bytes on the wire.
- How to manage low-level, network connections (and reconnections) between hosts, detect crashed actor systems and hosts, all transparently.
- How to multiplex communications from an unrelated set of actors on the same network connection, all transparently.
-
Cluster
- How to maintain a set of actor systems (a cluster) that can communicate with each other and consider each other as part of the cluster.
- How to introduce a new system safely to the set of already existing members.
- How to reliably detect systems that are temporarily unreachable.
- How to remove failed hosts/systems (or scale down the system) so that all remaining members agree on the remaining subset of the cluster.
- How to distribute computations among the current set of members.
- How to designate members of the cluster to a certain role, in other words, to provide certain services and not others.
-
Cluster Sharding
- How to model and scale out a large set of stateful entities on a set of systems.
- How to ensure that entities in the cluster are distributed properly so that load is properly balanced across the machines.
- How to ensure migrating entities from a crashed system without losing the state.
- How to ensure that an entity does not exist on multiple systems at the same time and hence keeps consistent.
-
Cluster Singleton
- How to ensure that only one instance of a service is running in the whole cluster.
- How to ensure that the service is up even if the system hosting it currently crashes or shuts down during the process of scaling down.
- How to reach this instance from any member of the cluster assuming that it can migrate to other systems over time.
-
Persistence
- How to restore the state of an entity/actor when system restarts or crashes.
- How to implement a CQRS system.
- How to ensure reliable delivery of messages in face of network errors and system crashes.
- How to introspect domain events that have led an entity to its current state.
- How to leverage Event Sourcing in your application to support long-running processes while the project continues to evolve.
-
projections
- Constructing alternate or aggregate views over an event stream.
- Propagating an event stream onto another downstream medium such as a Kafka topic.
- A simple way of building read-side projections in the context of Event Sourcing and CQRS system
-
Distributed Data
- How to accept writes even in the face of cluster partitions.
- How to share data while at the same time ensuring low-latency local read and write access.
-
Streams
- How to handle streams of events or large datasets with high performance, exploiting concurrency and keeping resource usage tight.
- How to assemble reusable pieces of event/data processing into flexible pipelines.
- How to connect asynchronous services in a flexible way to each other with high performance.
- How to provide or consume Reactive Streams compliant interfaces to interface with a third party library.
-
Alpakka Alpakka is a separate module from Akka
- Connecting various infrastructure or persistence components to Stream based flows.
- Connecting to legacy systems in a manner that adheres to a Reactive Streams API.
-
HTTP Akka HTTP is a separate module from Akka.
- How to expose services of a system or cluster to the external world via an HTTP API in a performant way.
- How to stream large datasets in and out of a system using HTTP.
- How to stream live events in and out of a system using HTTP.
-
gRPC Akka gRPC is a separate module from Akka.
- Exposing services with all the benefits of gRPC & protobuf:
- Schema-first contract
- Schema evolution support
- Efficient binary protocol
- First-class streaming support
- Wide interoperability
- Use of HTTP/2 connection multiplexing
如果需要使用akka内部的module,需要添加如下依赖
val AkkaVersion = "2.5.21"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-singleton" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-projection-core" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion
actor层次结构

- / 称作root guardian,是系统内所有actor的parent, 当一个系统被关闭时,它也是最后一个被停止的actor
- /system 称作system guardian. Akka 或者其它的内置libraries可能会在此创建actor在system的命名空间下
- /user 称作user guardian. 应用内创建的actor都会在它下面
无论何时terminate actor, 它的子actor会首先递归停止. 停止一个子actor可以通过在其parent actor调用 context.stop(childRef),
object StartStopActor1 {
def apply(): Behavior[String] =
Behaviors.setup(context => new StartStopActor1(context))
}
class StartStopActor1(context: ActorContext[String]) extends AbstractBehavior[String]{
println("first started")
context.spawn(StartStopActor2(), "second")
override def onMessage(msg: String): Behavior[String] =
msg match {
case "stop" => Behaviors.stopped
}
override def onSignal: PartialFunction[Signal, Behavior[String]] = {
case PostStop =>
println("first stopped")
this
}
}
object StartStopActor2 {
def apply(): Behavior[String] =
Behaviors.setup(new StartStopActor2(_))
}
class StartStopActor2(context: ActorContext[String]) extends AbstractBehavior[String] {
println("second started")
override def onMessage(msg: String): Behavior[String] = {
// no messages handled by this actor
Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[String]] = {
case PostStop =>
println("second stopped")
this
}
}
object Main {
def apply(): Behavior[String] =
Behaviors.setup(context => new Main(context))
}
class Main(context: ActorContext[String]) extends AbstractBehavior[String](context) {
override def onMessage(msg: String): Behavior[String] =
msg match {
case "first" =>
val first = context.spawn(StartStopActor1(), "first")
println(s"fistActor: ${first}")
Thread.sleep(4000)
first ! "stop"
this
}
}
object ActorHierarchyExperiments extends App {
val testSystem = ActorSystem(Main(), "testSystem")
testSystem ! "first"
}
输出结果如下
first started
second started
second stopped
first stopped
很显然,second 首先被terminated 然后才是actor1,以上代码就说明了actor的这个特性: 在parent actor停止之前,会递归地停止它的子actor。
异常处理
和传统的防御性编程不同,Akka沿袭了Erlang的let it crash哲学,当actor内部发生异常时,并不试图捕捉异常并处理,而是重建一个新的actor, 使得整个系统在错误发生的时候可以自动恢复。
-
监护策略 在Akka中,在子actor被创建后,父actor可以成为子actor的监护者,在子actor出错时负责处理, 有一对一和一对多两种策略:
- OneForOneStrategy 只诛首恶,余者不问。只有出错的actor会被处理。Akka默认采用这种机制。
- OneForAllStrategy 城门失火,殃及池鱼。当出错时,不仅出错的actor,其兄弟actor也采用同样的策略一并处理。
-
恢复策略 actor出错时具体采用何种策略呢?共有四种:
- Stop - 停止出错的actor,不再让它处理任何消息。
- Restart - 这是默认策略,杀死旧的actor,重新创建一个新的actor。
- Resume - 忽略本次错误,恢复actor对消息的处理。
- Escalate - 交给父actor来决定处理策略。
object SupervisingActor {
def apply(): Behavior[String] =
Behaviors.setup(context => new SupervisingActor(context))
}
class SupervisingActor(context: ActorContext[String]) extends AbstractBehavior[String] {
private val child = context.spawn(
Behaviors.supervise(SupervisedActor()).onFailure(SupervisorStrategy.restart),
name = "supervised-actor")
override def onMessage(msg: String): Behavior[String] =
msg match {
case "failChild" =>
child ! "fail"
this
}
}
object SupervisedActor {
def apply(): Behavior[String] =
Behaviors.setup(context => new SupervisedActor(context))
}
class SupervisedActor(context: ActorContext[String]) extends AbstractBehavior[String] {
println("supervised actor started")
override def onMessage(msg: String): Behavior[String] =
msg match {
case "fail" =>
println("supervised actor fails now")
throw new Exception("I failed!")
}
override def onSignal: PartialFunction[Signal, Behavior[String]] = {
case PreRestart =>
println("supervised actor will be restarted")
this
case PostStop =>
println("supervised actor stopped")
this
}
}
object Main {
def apply(): Behavior[String] =
Behaviors.setup(context => new Main(context))
}
class Main(context: ActorContext[String]) extends AbstractBehavior[String] {
override def onMessage(msg: String): Behavior[String] =
msg match {
case "first" =>
val first = context.spawn(StartStopActor1(), "first")
println(s"fistActor: ${first}")
Thread.sleep(4000)
first ! "stop"
this
case "failure"=>
val supervisingActor = context.spawn(SupervisingActor(), "supervising-actor")
supervisingActor ! "failChild"
this
}
}
object ActorHierarchyExperiments extends App {
val testSystem = ActorSystem(Main(), "testSystem")
testSystem ! "failure"
}
会看到以下输出:
supervised actor started
supervised actor fails now
supervised actor will be restarted
supervised actor started
[ERROR] [06/30/2021 12:01:41.800] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/supervising-actor/supervised-actor] Supervisor RestartSupervisor saw failure: I failed!
java.lang.Exception: I failed!
在发生failure之后,被监控的actor停止然后立刻restarted(这就是为什么会打印出supervised actor will be restarted 但是没有supervised actor stopped)
一个IoT例子

官方文档提供了一个IoT例子,在这个IoT系统中分别有三个角色DevcieActor, DeviceGroup, DeviceManager以及一哥查询功能queryActor
DeviceActor这个角色有两个任务,记录温度,读取温度。
DevcieGroup 需要保存记录在Group下的device信息(使用一个内部Map对象实现),主要任务有两个,当创建一个新的deviceActor时监控它,当收到actordevice stop消息需要从Map中移除。这个功能我们用以下代码实现
context.watchWith(deviceActor, DeviceTerminated(deviceActor, groupId, deviceId))
//Registers this actor as a Monitor for the provided ActorRef. This actor will receive the specified message when watched actor is terminated.
上述代码的含义是,当deviceActor is terminated, 当前actor会收到消息DeviceTerminated(deviceActor, groupId, deviceId)
DeviceManager和DeviceGroup任务类似,同样要按需注册groupActor以及当groupActor停止时移除
在一个非query场景下,我们考虑的状态信息时很少的,官网中是这样说的:
- Device actors return a reading, which requires no state change
- Record a temperature, which updates a single field
- Device Group actors maintain group membership by adding or removing entries from a map
然而,假设我们需要查询一个group下的所有temperature,可能会发生这些意外
- A device actor might stop and not be able to respond back with a temperature reading.
- A new device actor might start up and not be included in the query because we weren’t aware of it.
- some actors might take a long time to answer
这些情形确实客观存在,最重要的是行为状态确定化,官网给出的solution如下
- When a query arrives, the group actor takes a snapshot of the existing device actors and will only ask those actors for the temperature.
- Actors that start up after the query arrives are ignored.
- If an actor in the snapshot stops during the query without answering, we will report the fact that it stopped to the sender of the query message.
- All actors in the snapshot have either responded or have confirmed being stopped.
- We reach a pre-defined deadline.
综合以上场景,我们将定义4种devcie状态来应对query
- It has a temperature available: Temperature.
- It has responded, but has no temperature available yet: TemperatureNotAvailable.
- It has stopped before answering: DeviceNotAvailable.
- It did not respond before the deadline: DeviceTimedOut.
这个query任务也会单独作为一类actor存在,它需要获得以下信息:
- The snapshot and IDs of active device actors to query.
- The ID of the request that started the query (so that we can include it in the reply).
- The reference of the actor who sent the query. We will send the reply to this actor directly.
- A deadline that indicates how long the query should wait for replies. Making this a parameter will simplify testing.
具体实现时,通过一个set记录正在排队等待temperature的deviceActor, 一个map用来记录查询到的temperature
完整的代码
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.PostStop
import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors, TimerScheduler}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
object Device {
def apply(groupId: String, deviceId: String): Behavior[Command] =
Behaviors.setup(context => new Device(context, groupId, deviceId))
sealed trait Command
//typical Request-Response pattern
final case class ReadTemperature(requestId: Long, replyTo: ActorRef[RespondTemperature]) extends Command
final case class RespondTemperature(requestId: Long, deviceId: String, value: Option[Double])
final case class RecordTemperature(requestId: Long, value: Double, replyTo: ActorRef[TemperatureRecorded])
extends Command
final case class TemperatureRecorded(requestId: Long)
case object Passivate extends Command
}
class Device(context: ActorContext[Device.Command], groupId: String, deviceId: String)
extends AbstractBehavior[Device.Command] {
import Device._
var lastTemperatureReading: Option[Double] = None
context.log.info(s"Device actor <Latex formula="\{groupId\}-" inline />{deviceId} started" )
override def onMessage(msg: Command): Behavior[Command] = {
msg match {
case RecordTemperature(id, value, replyTo) =>
context.log.info(s"Recorded temperature reading <Latex formula="\{value\} with" inline />{id}")
lastTemperatureReading = Some(value)
replyTo ! TemperatureRecorded(id)
this
case ReadTemperature(id, replyTo) =>
replyTo ! RespondTemperature(id, deviceId, lastTemperatureReading)
this
case Passivate =>
Behaviors.stopped
}
}
override def onSignal: PartialFunction[Signal, Behavior[Command]] = {
case PostStop =>
context.log.info(s"Device actor <Latex formula="\{groupId\}-" inline />{deviceId} stopped")
this
}
}
// device group
object DeviceGroup {
def apply(groupId: String): Behavior[Command] =
Behaviors.setup(context => new DeviceGroup(context, groupId))
trait Command
private final case class DeviceTerminated(device: ActorRef[Device.Command], groupId: String, deviceId: String)
extends Command
}
class DeviceGroup(context: ActorContext[DeviceGroup.Command], groupId: String)
extends AbstractBehavior[DeviceGroup.Command] {
import DeviceGroup._
import DeviceManager.{ DeviceRegistered, ReplyDeviceList, RequestDeviceList, RequestTrackDevice, RequestAllTemperatures }
private var deviceIdToActor = Map.empty[String, ActorRef[Device.Command]]
context.log.info(s"DeviceGroup ${groupId} started")
override def onMessage(msg: Command): Behavior[Command] =
msg match {
case trackMsg @ RequestTrackDevice(`groupId`, deviceId, replyTo) =>
deviceIdToActor.get(deviceId) match {
case Some(deviceActor) =>
replyTo ! DeviceRegistered(deviceActor)
case None =>
context.log.info(s"Creating device actor for ${trackMsg.deviceId}")
val deviceActor = context.spawn(Device(groupId, deviceId), s"device-$deviceId")
context.watchWith(deviceActor, DeviceTerminated(deviceActor, groupId, deviceId))
deviceIdToActor += deviceId -> deviceActor
replyTo ! DeviceRegistered(deviceActor)
}
this
case RequestTrackDevice(gId, _, _) =>
context.log.warning(s"Ignoring TrackDevice request for <Latex formula="\{gId\}. This actor is responsible for" inline />{groupId}.")
this
case RequestDeviceList(requestId, gId, replyTo) =>
if (gId == groupId) {
replyTo ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
this
} else
Behaviors.unhandled
case DeviceTerminated(_, _, deviceId) =>
context.log.info(s"Device actor for ${deviceId} has been terminated")
deviceIdToActor -= deviceId
this
case RequestAllTemperatures(requestId, gId, replyTo) =>
if (gId == groupId) {
context.spawnAnonymous(
DeviceGroupQuery(deviceIdToActor, requestId = requestId, requester = replyTo, 3.seconds))
this
} else
Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[Command]] = {
case PostStop =>
context.log.info(s"DeviceGroup ${groupId} stopped")
this
}
}
// ---- device group
// device manager
object DeviceManager {
def apply(): Behavior[Command] =
Behaviors.setup(context => new DeviceManager(context))
sealed trait Command
final case class RequestTrackDevice(groupId: String, deviceId: String, replyTo: ActorRef[DeviceRegistered])
extends DeviceManager.Command
with DeviceGroup.Command
final case class DeviceRegistered(device: ActorRef[Device.Command])
final case class RequestDeviceList(requestId: Long, groupId: String, replyTo: ActorRef[ReplyDeviceList])
extends DeviceManager.Command
with DeviceGroup.Command
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
private final case class DeviceGroupTerminated(groupId: String) extends DeviceManager.Command
final case class RequestAllTemperatures(requestId: Long, groupId: String, replyTo: ActorRef[RespondAllTemperatures])
extends DeviceGroupQuery.Command
with DeviceGroup.Command
with DeviceManager.Command
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])
sealed trait TemperatureReading
final case class Temperature(value: Double) extends TemperatureReading
case object TemperatureNotAvailable extends TemperatureReading
case object DeviceNotAvailable extends TemperatureReading
case object DeviceTimedOut extends TemperatureReading
}
class DeviceManager(context: ActorContext[DeviceManager.Command])
extends AbstractBehavior[DeviceManager.Command] {
import DeviceManager._
var groupIdToActor = Map.empty[String, ActorRef[DeviceGroup.Command]]
context.log.info("DeviceManager started")
override def onMessage(msg: Command): Behavior[Command] =
msg match {
case trackMsg @ RequestTrackDevice(groupId, _, replyTo) =>
groupIdToActor.get(groupId) match {
case Some(ref) =>
ref ! trackMsg
case None =>
context.log.info(s"Creating device group actor for ${groupId}")
val groupActor = context.spawn(DeviceGroup(groupId), "group-" + groupId)
context.watchWith(groupActor, DeviceGroupTerminated(groupId))
groupActor ! trackMsg
groupIdToActor += groupId -> groupActor
}
this
case req @ RequestDeviceList(requestId, groupId, replyTo) =>
groupIdToActor.get(groupId) match {
case Some(ref) =>
ref ! req
case None =>
replyTo ! ReplyDeviceList(requestId, Set.empty)
}
this
case DeviceGroupTerminated(groupId) =>
context.log.info(s"Device group actor for ${groupId} has been terminated")
groupIdToActor -= groupId
this
}
override def onSignal: PartialFunction[Signal, Behavior[Command]] = {
case PostStop =>
context.log.info("DeviceManager stopped")
this
}
}
// --device manager
// groupdevciequery
object DeviceGroupQuery {
def apply(
deviceIdToActor: Map[String, ActorRef[Device.Command]],
requestId: Long,
requester: ActorRef[DeviceManager.RespondAllTemperatures],
timeout: FiniteDuration): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
new DeviceGroupQuery(deviceIdToActor, requestId, requester, timeout, context, timers)
}
}
}
trait Command
private case object CollectionTimeout extends Command
final case class WrappedRespondTemperature(response: Device.RespondTemperature) extends Command
private final case class DeviceTerminated(deviceId: String) extends Command
}
class DeviceGroupQuery(
deviceIdToActor: Map[String, ActorRef[Device.Command]],
requestId: Long,
requester: ActorRef[DeviceManager.RespondAllTemperatures],
timeout: FiniteDuration,
context: ActorContext[DeviceGroupQuery.Command],
timers: TimerScheduler[DeviceGroupQuery.Command])
extends AbstractBehavior[DeviceGroupQuery.Command] {
import DeviceGroupQuery._
import DeviceManager.DeviceNotAvailable
import DeviceManager.DeviceTimedOut
import DeviceManager.RespondAllTemperatures
import DeviceManager.Temperature
import DeviceManager.TemperatureNotAvailable
import DeviceManager.TemperatureReading
timers.startSingleTimer(CollectionTimeout, CollectionTimeout, timeout)
private val respondTemperatureAdapter = context.messageAdapter(WrappedRespondTemperature.apply)
private var repliesSoFar = Map.empty[String, TemperatureReading]
private var stillWaiting = deviceIdToActor.keySet
deviceIdToActor.foreach {
case (deviceId, device) =>
context.watchWith(device, DeviceTerminated(deviceId))
device ! Device.ReadTemperature(0, respondTemperatureAdapter)
}
override def onMessage(msg: Command): Behavior[Command] =
msg match {
case WrappedRespondTemperature(response) => onRespondTemperature(response)
case DeviceTerminated(deviceId) => onDeviceTerminated(deviceId)
case CollectionTimeout => onCollectionTimout()
}
private def onRespondTemperature(response: Device.RespondTemperature): Behavior[Command] = {
val reading = response.value match {
case Some(value) => Temperature(value)
case None => TemperatureNotAvailable
}
val deviceId = response.deviceId
repliesSoFar += (deviceId -> reading)
stillWaiting -= deviceId
respondWhenAllCollected()
}
private def onDeviceTerminated(deviceId: String): Behavior[Command] = {
if (stillWaiting(deviceId)) {
repliesSoFar += (deviceId -> DeviceNotAvailable)
stillWaiting -= deviceId
}
respondWhenAllCollected()
}
private def onCollectionTimout(): Behavior[Command] = {
repliesSoFar ++= stillWaiting.map(deviceId => deviceId -> DeviceTimedOut)
stillWaiting = Set.empty
respondWhenAllCollected()
}
private def respondWhenAllCollected(): Behavior[Command] = {
if (stillWaiting.isEmpty) {
requester ! RespondAllTemperatures(requestId, repliesSoFar)
Behaviors.stopped
} else {
this
}
}
}
// --groupdevicequery
测试代码
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest._
import scala.concurrent.duration.DurationInt
class DeviceSpec extends ScalaTestWithActorTestKit with FlatSpecLike {
import Device._
"Device actor" should "reply with empty reading if no temperature is known" in {
val recordProbe = createTestProbe[TemperatureRecorded]()
val readProbe = createTestProbe[RespondTemperature]()
val deviceActor = spawn(Device("group", "device"))
deviceActor ! Device.RecordTemperature(requestId = 1, 24.0, recordProbe.ref)
recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))
deviceActor ! Device.ReadTemperature(requestId = 2, readProbe.ref)
val response1 = readProbe.receiveMessage()
response1.requestId should ===(2)
response1.value should ===(Some(24.0))
deviceActor ! Device.RecordTemperature(requestId = 3, 55.0, recordProbe.ref)
recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 3))
deviceActor ! Device.ReadTemperature(requestId = 4, readProbe.ref)
val response2 = readProbe.receiveMessage()
response2.requestId should ===(4)
response2.value should ===(Some(55.0))
}
}
class GroupSpec extends ScalaTestWithActorTestKit with FlatSpecLike{
import Device._
import DeviceGroup._
import DeviceManager._
import DeviceGroupQuery._
"group actor" should "be able to register a device actor" in {
val probe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("group", "device1", probe.ref)
val registered1 = probe.receiveMessage()
val deviceActor1 = registered1.device
// another deviceId
groupActor ! RequestTrackDevice("group", "device2", probe.ref)
val registered2 = probe.receiveMessage()
val deviceActor2 = registered2.device
deviceActor1 should !==(deviceActor2)
// Check that the device actors are working
val recordProbe = createTestProbe[TemperatureRecorded]()
deviceActor1 ! RecordTemperature(requestId = 0, 1.0, recordProbe.ref)
recordProbe.expectMessage(TemperatureRecorded(requestId = 0))
deviceActor2 ! Device.RecordTemperature(requestId = 1, 2.0, recordProbe.ref)
recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))
}
"group actor" should "ignore requests for wrong groupId" in {
val probe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("wrongGroup", "device1", probe.ref)
probe.expectNoMessage(500.milliseconds)
}
"group actor" should "return same actor for same deviceId" in {
val probe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("group", "device1", probe.ref)
val registered1 = probe.receiveMessage()
// registering same again should be idempotent
groupActor ! RequestTrackDevice("group", "device1", probe.ref)
val registered2 = probe.receiveMessage()
registered1.device should ===(registered2.device)
}
"group actor" should "be able to list active devices" in {
val registeredProbe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("group", "device1", registeredProbe.ref)
registeredProbe.receiveMessage()
groupActor ! RequestTrackDevice("group", "device2", registeredProbe.ref)
registeredProbe.receiveMessage()
val deviceListProbe = createTestProbe[ReplyDeviceList]()
groupActor ! RequestDeviceList(requestId = 0, groupId = "group", deviceListProbe.ref)
deviceListProbe.expectMessage(ReplyDeviceList(requestId = 0, Set("device1", "device2")))
}
"group actor" should "be able to list active devices after one shuts down" in {
val registeredProbe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("group", "device1", registeredProbe.ref)
val registered1 = registeredProbe.receiveMessage()
val toShutDown = registered1.device
groupActor ! RequestTrackDevice("group", "device2", registeredProbe.ref)
registeredProbe.receiveMessage()
val deviceListProbe = createTestProbe[ReplyDeviceList]()
groupActor ! RequestDeviceList(requestId = 0, groupId = "group", deviceListProbe.ref)
deviceListProbe.expectMessage(ReplyDeviceList(requestId = 0, Set("device1", "device2")))
toShutDown ! Passivate
registeredProbe.expectTerminated(toShutDown, registeredProbe.remainingOrDefault)
// using awaitAssert to retry because it might take longer for the groupActor
// to see the Terminated, that order is undefined
registeredProbe.awaitAssert {
groupActor ! RequestDeviceList(requestId = 1, groupId = "group", deviceListProbe.ref)
deviceListProbe.expectMessage(ReplyDeviceList(requestId = 1, Set("device2")))
}
}
"group query" should "return temperature value for working devices" in {
val requester = createTestProbe[RespondAllTemperatures]()
val device1 = createTestProbe[Device.Command]()
val device2 = createTestProbe[Device.Command]()
val deviceIdToActor = Map("device1" -> device1.ref, "device2" -> device2.ref)
val queryActor =
spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))
device1.expectMessageType[Device.ReadTemperature]
device2.expectMessageType[Device.ReadTemperature]
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(1.0)))
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device2", Some(2.0)))
requester.expectMessage(
RespondAllTemperatures(
requestId = 1,
temperatures = Map("device1" -> Temperature(1.0), "device2" -> Temperature(2.0))))
}
"group query" should "return TemperatureNotAvailable for devices with no readings" in {
val requester = createTestProbe[RespondAllTemperatures]()
val device1 = createTestProbe[Device.Command]()
val device2 = createTestProbe[Device.Command]()
val deviceIdToActor = Map("device1" -> device1.ref, "device2" -> device2.ref)
val queryActor =
spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))
device1.expectMessageType[Device.ReadTemperature]
device2.expectMessageType[Device.ReadTemperature]
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", None))
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device2", Some(2.0)))
requester.expectMessage(
RespondAllTemperatures(
requestId = 1,
temperatures = Map("device1" -> TemperatureNotAvailable, "device2" -> Temperature(2.0))))
}
"group query" should "return DeviceNotAvailable if device stops before answering" in {
val requester = createTestProbe[RespondAllTemperatures]()
val device1 = createTestProbe[Device.Command]()
val device2 = createTestProbe[Device.Command]()
val deviceIdToActor = Map("device1" -> device1.ref, "device2" -> device2.ref)
val queryActor =
spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))
device1.expectMessageType[Device.ReadTemperature]
device2.expectMessageType[Device.ReadTemperature]
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(2.0)))
device2.stop()
requester.expectMessage(
RespondAllTemperatures(
requestId = 1,
temperatures = Map("device1" -> Temperature(2.0), "device2" -> DeviceNotAvailable)))
}
"group query" should "return temperature reading even if device stops after answering" in {
val requester = createTestProbe[RespondAllTemperatures]()
val device1 = createTestProbe[Device.Command]()
val device2 = createTestProbe[Device.Command]()
val deviceIdToActor = Map("device1" -> device1.ref, "device2" -> device2.ref)
val queryActor =
spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))
device1.expectMessageType[Device.ReadTemperature]
device2.expectMessageType[Device.ReadTemperature]
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(1.0)))
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device2", Some(2.0)))
device2.stop()
requester.expectMessage(
RespondAllTemperatures(
requestId = 1,
temperatures = Map("device1" -> Temperature(1.0), "device2" -> Temperature(2.0))))
}
"group query" should "return DeviceTimedOut if device does not answer in time" in {
val requester = createTestProbe[RespondAllTemperatures]()
val device1 = createTestProbe[Device.Command]()
val device2 = createTestProbe[Device.Command]()
val deviceIdToActor = Map("device1" -> device1.ref, "device2" -> device2.ref)
val queryActor =
spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 200.millis))
device1.expectMessageType[Device.ReadTemperature]
device2.expectMessageType[Device.ReadTemperature]
queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(1.0)))
// no reply from device2
requester.expectMessage(
RespondAllTemperatures(
requestId = 1,
temperatures = Map("device1" -> Temperature(1.0), "device2" -> DeviceTimedOut)))
}
"group query" should "be able to collect temperatures from all active devices" in {
val registeredProbe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("group", "device1", registeredProbe.ref)
val deviceActor1 = registeredProbe.receiveMessage().device
groupActor ! RequestTrackDevice("group", "device2", registeredProbe.ref)
val deviceActor2 = registeredProbe.receiveMessage().device
groupActor ! RequestTrackDevice("group", "device3", registeredProbe.ref)
registeredProbe.receiveMessage()
// Check that the device actors are working
val recordProbe = createTestProbe[TemperatureRecorded]()
deviceActor1 ! RecordTemperature(requestId = 0, 1.0, recordProbe.ref)
recordProbe.expectMessage(TemperatureRecorded(requestId = 0))
deviceActor2 ! RecordTemperature(requestId = 1, 2.0, recordProbe.ref)
recordProbe.expectMessage(TemperatureRecorded(requestId = 1))
// No temperature for device3
val allTempProbe = createTestProbe[RespondAllTemperatures]()
groupActor ! RequestAllTemperatures(requestId = 0, groupId = "group", allTempProbe.ref)
allTempProbe.expectMessage(
RespondAllTemperatures(
requestId = 0,
temperatures =
Map("device1" -> Temperature(1.0), "device2" -> Temperature(2.0), "device3" -> TemperatureNotAvailable)))
}
}
小结
- 官方给的例子使用了很多reactive design patterms,一个典型的pattern是request-response pattern 比如下面这段代码
final case class RequestDeviceList(requestId: Long, groupId: String, replyTo: ActorRef[ReplyDeviceList])
extends DeviceManager.Command
with DeviceGroup.Command
...........
case RequestDeviceList(requestId, gId, replyTo) =>
if (gId == groupId) {
replyTo ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
this
} else
Behaviors.unhandled
又比如
final case class RequestAllTemperatures(requestId: Long, groupId: String, replyTo: ActorRef[RespondAllTemperatures])
extends DeviceGroupQuery.Command
with DeviceGroup.Command
with DeviceManager.Command
......
case RequestAllTemperatures(requestId, gId, replyTo) =>
if (gId == groupId) {
context.spawnAnonymous(
DeviceGroupQuery(deviceIdToActor, requestId = requestId, requester = replyTo, 3.seconds))
this
} else
Behaviors.unhandled
总结为这样的形式
case class <request> (...., replyTo:<respond>)
-
并发编程模型除了actor模型,还有future promise,这也是Scala提供的并发框架
-
模式匹配使用频率非常多,需要牢固掌握