Skip to content
matt90luo's Blog
Go back

akka学习笔记(一)

akka是什么? akka是一个基于actor并发模型的框架,该模型可以追溯到1973年发表的论文《A Universal Modular Actor Formalism for Artificial Intelligence 》, Actor就源于该模型。

actor通过消息传递的方式与外界通信。消息传递是异步的。每个actor都有一个邮箱,该邮箱接收并缓存其他actor发过来的消息,actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。

<!— more —>

Akka的哲学是基于End to End Argument这个事实的,即:在分布式环境中,reliable的通信需要在application层来确保,中间件层或者infrastructure只能提供效率上的帮助,而绝对无法保证reliability。

akka是一个基于actor模型的框架,提供了极致的异步并发处理性能,但是分布式条件下的可靠性需要使用者来保障

akka模块

以下列出了akka核心模块,所有akka的核心功能模块均遵循开源软件协议(Open Source Software (OSS)), Lightbend同时也提供商业版本,提供基于akka的增强功能。

如果需要使用akka内部的module,需要添加如下依赖

val AkkaVersion = "2.5.21"

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-remote" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-cluster-singleton" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-projection-core" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % AkkaVersion

libraryDependencies += "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion

actor层次结构

无论何时terminate actor, 它的子actor会首先递归停止. 停止一个子actor可以通过在其parent actor调用 context.stop(childRef),

object StartStopActor1 &#123;
  def apply(): Behavior[String] =
    Behaviors.setup(context =&gt; new StartStopActor1(context))
&#125;

class StartStopActor1(context: ActorContext[String]) extends AbstractBehavior[String]&#123;
  println("first started")
  context.spawn(StartStopActor2(), "second")

  override def onMessage(msg: String): Behavior[String] =
    msg match &#123;
      case "stop" =&gt; Behaviors.stopped
    &#125;

  override def onSignal: PartialFunction[Signal, Behavior[String]] = &#123;
    case PostStop =&gt;
      println("first stopped")
      this
  &#125;

&#125;

object StartStopActor2 &#123;
  def apply(): Behavior[String] =
    Behaviors.setup(new StartStopActor2(_))
&#125;

class StartStopActor2(context: ActorContext[String]) extends AbstractBehavior[String] &#123;
  println("second started")

  override def onMessage(msg: String): Behavior[String] = &#123;
    // no messages handled by this actor
    Behaviors.unhandled
  &#125;

  override def onSignal: PartialFunction[Signal, Behavior[String]] = &#123;
    case PostStop =&gt;
      println("second stopped")
      this
  &#125;
&#125;

object Main &#123;
  def apply(): Behavior[String] =
    Behaviors.setup(context =&gt; new Main(context))

&#125;

class Main(context: ActorContext[String]) extends AbstractBehavior[String](context) &#123;
  override def onMessage(msg: String): Behavior[String] =
    msg match &#123;
      case "first" =&gt;
        val first = context.spawn(StartStopActor1(), "first")
        println(s"fistActor: $&#123;first&#125;")
        Thread.sleep(4000)
        first ! "stop"
        this
    &#125;
&#125;

object ActorHierarchyExperiments extends App &#123;
  val testSystem = ActorSystem(Main(), "testSystem")
  testSystem ! "first"
&#125;

输出结果如下

first started
second started
second stopped
first stopped

很显然,second 首先被terminated 然后才是actor1,以上代码就说明了actor的这个特性: 在parent actor停止之前,会递归地停止它的子actor。

异常处理

和传统的防御性编程不同,Akka沿袭了Erlang的let it crash哲学,当actor内部发生异常时,并不试图捕捉异常并处理,而是重建一个新的actor, 使得整个系统在错误发生的时候可以自动恢复。

object SupervisingActor &#123;
  def apply(): Behavior[String] =
    Behaviors.setup(context =&gt; new SupervisingActor(context))
&#125;

class SupervisingActor(context: ActorContext[String]) extends AbstractBehavior[String] &#123;
  private val child = context.spawn(
    Behaviors.supervise(SupervisedActor()).onFailure(SupervisorStrategy.restart),
    name = "supervised-actor")

  override def onMessage(msg: String): Behavior[String] =
    msg match &#123;
      case "failChild" =&gt;
        child ! "fail"
        this
    &#125;
&#125;

object SupervisedActor &#123;
  def apply(): Behavior[String] =
    Behaviors.setup(context =&gt; new SupervisedActor(context))
&#125;

class SupervisedActor(context: ActorContext[String]) extends AbstractBehavior[String] &#123;
  println("supervised actor started")

  override def onMessage(msg: String): Behavior[String] =
    msg match &#123;
      case "fail" =&gt;
        println("supervised actor fails now")
        throw new Exception("I failed!")
    &#125;

  override def onSignal: PartialFunction[Signal, Behavior[String]] = &#123;
    case PreRestart =&gt;
      println("supervised actor will be restarted")
      this
    case PostStop =&gt;
      println("supervised actor stopped")
      this
  &#125;
&#125;

object Main &#123;
  def apply(): Behavior[String] =
    Behaviors.setup(context =&gt; new Main(context))

&#125;

class Main(context: ActorContext[String]) extends AbstractBehavior[String] &#123;
  override def onMessage(msg: String): Behavior[String] =
    msg match &#123;

      case "first" =&gt;
        val first = context.spawn(StartStopActor1(), "first")
        println(s"fistActor: $&#123;first&#125;")
        Thread.sleep(4000)
        first ! "stop"
        this

      case "failure"=&gt;
        val supervisingActor = context.spawn(SupervisingActor(), "supervising-actor")
        supervisingActor ! "failChild"
        this
    &#125;
&#125;

object ActorHierarchyExperiments extends App &#123;
  val testSystem = ActorSystem(Main(), "testSystem")
  testSystem ! "failure"
&#125;

会看到以下输出:

supervised actor started
supervised actor fails now
supervised actor will be restarted
supervised actor started
[ERROR] [06/30/2021 12:01:41.800] [testSystem-akka.actor.default-dispatcher-3] [akka://testSystem/user/supervising-actor/supervised-actor] Supervisor RestartSupervisor saw failure: I failed!
java.lang.Exception: I failed!

在发生failure之后,被监控的actor停止然后立刻restarted(这就是为什么会打印出supervised actor will be restarted 但是没有supervised actor stopped)

一个IoT例子

官方文档提供了一个IoT例子,在这个IoT系统中分别有三个角色DevcieActor, DeviceGroup, DeviceManager以及一哥查询功能queryActor

DeviceActor这个角色有两个任务,记录温度,读取温度。

DevcieGroup 需要保存记录在Group下的device信息(使用一个内部Map对象实现),主要任务有两个,当创建一个新的deviceActor时监控它,当收到actordevice stop消息需要从Map中移除。这个功能我们用以下代码实现


context.watchWith(deviceActor, DeviceTerminated(deviceActor, groupId, deviceId))
//Registers this actor as a Monitor for the provided ActorRef. This actor will receive the specified message when watched actor is terminated.

上述代码的含义是,当deviceActor is terminated, 当前actor会收到消息DeviceTerminated(deviceActor, groupId, deviceId)

DeviceManager和DeviceGroup任务类似,同样要按需注册groupActor以及当groupActor停止时移除

在一个非query场景下,我们考虑的状态信息时很少的,官网中是这样说的:

然而,假设我们需要查询一个group下的所有temperature,可能会发生这些意外

这些情形确实客观存在,最重要的是行为状态确定化,官网给出的solution如下

  1. When a query arrives, the group actor takes a snapshot of the existing device actors and will only ask those actors for the temperature.
  2. Actors that start up after the query arrives are ignored.
  3. If an actor in the snapshot stops during the query without answering, we will report the fact that it stopped to the sender of the query message.
  4. All actors in the snapshot have either responded or have confirmed being stopped.
  5. We reach a pre-defined deadline.

综合以上场景,我们将定义4种devcie状态来应对query

这个query任务也会单独作为一类actor存在,它需要获得以下信息:

  1. The snapshot and IDs of active device actors to query.
  2. The ID of the request that started the query (so that we can include it in the reply).
  3. The reference of the actor who sent the query. We will send the reply to this actor directly.
  4. A deadline that indicates how long the query should wait for replies. Making this a parameter will simplify testing.

具体实现时,通过一个set记录正在排队等待temperature的deviceActor, 一个map用来记录查询到的temperature

完整的代码

import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.PostStop
import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.&#123;AbstractBehavior, ActorContext, Behaviors, TimerScheduler&#125;

import scala.concurrent.duration.&#123;DurationInt, FiniteDuration&#125;


object Device &#123;
  def apply(groupId: String, deviceId: String): Behavior[Command] =
    Behaviors.setup(context =&gt; new Device(context, groupId, deviceId))

  sealed trait Command

  //typical Request-Response pattern
  final case class ReadTemperature(requestId: Long, replyTo: ActorRef[RespondTemperature]) extends Command
  final case class RespondTemperature(requestId: Long, deviceId: String, value: Option[Double])

  final case class RecordTemperature(requestId: Long, value: Double, replyTo: ActorRef[TemperatureRecorded])
    extends Command
  final case class TemperatureRecorded(requestId: Long)

  case object Passivate extends Command

&#125;

class Device(context: ActorContext[Device.Command], groupId: String, deviceId: String)
  extends AbstractBehavior[Device.Command] &#123;
  import Device._

  var lastTemperatureReading: Option[Double] = None

  context.log.info(s"Device actor <Latex formula="\{groupId\}-" inline />&#123;deviceId&#125; started" )

  override def onMessage(msg: Command): Behavior[Command] = &#123;
    msg match &#123;
      case RecordTemperature(id, value, replyTo) =&gt;
        context.log.info(s"Recorded temperature reading <Latex formula="\{value\} with" inline />&#123;id&#125;")
        lastTemperatureReading = Some(value)
        replyTo ! TemperatureRecorded(id)
        this

      case ReadTemperature(id, replyTo) =&gt;
        replyTo ! RespondTemperature(id, deviceId, lastTemperatureReading)
        this

      case Passivate =&gt;
        Behaviors.stopped
    &#125;
  &#125;

  override def onSignal: PartialFunction[Signal, Behavior[Command]] = &#123;
    case PostStop =&gt;
      context.log.info(s"Device actor <Latex formula="\{groupId\}-" inline />&#123;deviceId&#125; stopped")
      this
  &#125;

&#125;


// device group
object DeviceGroup &#123;
  def apply(groupId: String): Behavior[Command] =
    Behaviors.setup(context =&gt; new DeviceGroup(context, groupId))

  trait Command

  private final case class DeviceTerminated(device: ActorRef[Device.Command], groupId: String, deviceId: String)
    extends Command

&#125;

class DeviceGroup(context: ActorContext[DeviceGroup.Command], groupId: String)
  extends AbstractBehavior[DeviceGroup.Command] &#123;
  import DeviceGroup._
  import DeviceManager.&#123; DeviceRegistered, ReplyDeviceList, RequestDeviceList, RequestTrackDevice, RequestAllTemperatures &#125;

  private var deviceIdToActor = Map.empty[String, ActorRef[Device.Command]]

  context.log.info(s"DeviceGroup $&#123;groupId&#125; started")

  override def onMessage(msg: Command): Behavior[Command] =
    msg match &#123;
      case trackMsg @ RequestTrackDevice(`groupId`, deviceId, replyTo) =&gt;
        deviceIdToActor.get(deviceId) match &#123;
          case Some(deviceActor) =&gt;
            replyTo ! DeviceRegistered(deviceActor)
          case None =&gt;
            context.log.info(s"Creating device actor for $&#123;trackMsg.deviceId&#125;")
            val deviceActor = context.spawn(Device(groupId, deviceId), s"device-$deviceId")
            context.watchWith(deviceActor, DeviceTerminated(deviceActor, groupId, deviceId))
            deviceIdToActor += deviceId -&gt; deviceActor
            replyTo ! DeviceRegistered(deviceActor)
        &#125;
        this

      case RequestTrackDevice(gId, _, _) =&gt;
        context.log.warning(s"Ignoring TrackDevice request for <Latex formula="\{gId\}. This actor is responsible for" inline />&#123;groupId&#125;.")
        this

      case RequestDeviceList(requestId, gId, replyTo) =&gt;
        if (gId == groupId) &#123;
          replyTo ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
          this
        &#125; else
          Behaviors.unhandled

      case DeviceTerminated(_, _, deviceId) =&gt;
        context.log.info(s"Device actor for $&#123;deviceId&#125; has been terminated")
        deviceIdToActor -= deviceId
        this

      case RequestAllTemperatures(requestId, gId, replyTo) =&gt;
        if (gId == groupId) &#123;
          context.spawnAnonymous(
            DeviceGroupQuery(deviceIdToActor, requestId = requestId, requester = replyTo, 3.seconds))
          this
        &#125; else
          Behaviors.unhandled

    &#125;

  override def onSignal: PartialFunction[Signal, Behavior[Command]] = &#123;
    case PostStop =&gt;
      context.log.info(s"DeviceGroup $&#123;groupId&#125; stopped")
      this
  &#125;
&#125;
// ---- device group

// device manager
object DeviceManager &#123;
  def apply(): Behavior[Command] =
    Behaviors.setup(context =&gt; new DeviceManager(context))


  sealed trait Command

  final case class RequestTrackDevice(groupId: String, deviceId: String, replyTo: ActorRef[DeviceRegistered])
    extends DeviceManager.Command
      with DeviceGroup.Command

  final case class DeviceRegistered(device: ActorRef[Device.Command])

  final case class RequestDeviceList(requestId: Long, groupId: String, replyTo: ActorRef[ReplyDeviceList])
    extends DeviceManager.Command
      with DeviceGroup.Command

  final case class ReplyDeviceList(requestId: Long, ids: Set[String])

  private final case class DeviceGroupTerminated(groupId: String) extends DeviceManager.Command


  final case class RequestAllTemperatures(requestId: Long, groupId: String, replyTo: ActorRef[RespondAllTemperatures])
    extends DeviceGroupQuery.Command
      with DeviceGroup.Command
      with DeviceManager.Command

  final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])

  sealed trait TemperatureReading
  final case class Temperature(value: Double) extends TemperatureReading
  case object TemperatureNotAvailable extends TemperatureReading
  case object DeviceNotAvailable extends TemperatureReading
  case object DeviceTimedOut extends TemperatureReading

&#125;

class DeviceManager(context: ActorContext[DeviceManager.Command])
  extends AbstractBehavior[DeviceManager.Command] &#123;
  import DeviceManager._

  var groupIdToActor = Map.empty[String, ActorRef[DeviceGroup.Command]]

  context.log.info("DeviceManager started")

  override def onMessage(msg: Command): Behavior[Command] =
    msg match &#123;
      case trackMsg @ RequestTrackDevice(groupId, _, replyTo) =&gt;
        groupIdToActor.get(groupId) match &#123;
          case Some(ref) =&gt;
            ref ! trackMsg
          case None =&gt;
            context.log.info(s"Creating device group actor for $&#123;groupId&#125;")
            val groupActor = context.spawn(DeviceGroup(groupId), "group-" + groupId)
            context.watchWith(groupActor, DeviceGroupTerminated(groupId))
            groupActor ! trackMsg
            groupIdToActor += groupId -&gt; groupActor
        &#125;
        this

      case req @ RequestDeviceList(requestId, groupId, replyTo) =&gt;
        groupIdToActor.get(groupId) match &#123;
          case Some(ref) =&gt;
            ref ! req
          case None =&gt;
            replyTo ! ReplyDeviceList(requestId, Set.empty)
        &#125;
        this

      case DeviceGroupTerminated(groupId) =&gt;
        context.log.info(s"Device group actor for $&#123;groupId&#125; has been terminated")
        groupIdToActor -= groupId
        this
    &#125;

  override def onSignal: PartialFunction[Signal, Behavior[Command]] = &#123;
    case PostStop =&gt;
      context.log.info("DeviceManager stopped")
      this
  &#125;

&#125;
// --device manager

// groupdevciequery
object DeviceGroupQuery &#123;

  def apply(
             deviceIdToActor: Map[String, ActorRef[Device.Command]],
             requestId: Long,
             requester: ActorRef[DeviceManager.RespondAllTemperatures],
             timeout: FiniteDuration): Behavior[Command] = &#123;
    Behaviors.setup &#123; context =&gt;
      Behaviors.withTimers &#123; timers =&gt;
        new DeviceGroupQuery(deviceIdToActor, requestId, requester, timeout, context, timers)
      &#125;
    &#125;
  &#125;

  trait Command

  private case object CollectionTimeout extends Command

  final case class WrappedRespondTemperature(response: Device.RespondTemperature) extends Command

  private final case class DeviceTerminated(deviceId: String) extends Command
&#125;

class DeviceGroupQuery(
                        deviceIdToActor: Map[String, ActorRef[Device.Command]],
                        requestId: Long,
                        requester: ActorRef[DeviceManager.RespondAllTemperatures],
                        timeout: FiniteDuration,
                        context: ActorContext[DeviceGroupQuery.Command],
                        timers: TimerScheduler[DeviceGroupQuery.Command])
  extends AbstractBehavior[DeviceGroupQuery.Command] &#123;

  import DeviceGroupQuery._
  import DeviceManager.DeviceNotAvailable
  import DeviceManager.DeviceTimedOut
  import DeviceManager.RespondAllTemperatures
  import DeviceManager.Temperature
  import DeviceManager.TemperatureNotAvailable
  import DeviceManager.TemperatureReading

  timers.startSingleTimer(CollectionTimeout, CollectionTimeout, timeout)

  private val respondTemperatureAdapter = context.messageAdapter(WrappedRespondTemperature.apply)

  private var repliesSoFar = Map.empty[String, TemperatureReading]
  private var stillWaiting = deviceIdToActor.keySet


  deviceIdToActor.foreach &#123;
    case (deviceId, device) =&gt;
      context.watchWith(device, DeviceTerminated(deviceId))
      device ! Device.ReadTemperature(0, respondTemperatureAdapter)
  &#125;

  override def onMessage(msg: Command): Behavior[Command] =
    msg match &#123;
      case WrappedRespondTemperature(response) =&gt; onRespondTemperature(response)
      case DeviceTerminated(deviceId)          =&gt; onDeviceTerminated(deviceId)
      case CollectionTimeout                   =&gt; onCollectionTimout()
    &#125;

  private def onRespondTemperature(response: Device.RespondTemperature): Behavior[Command] = &#123;
    val reading = response.value match &#123;
      case Some(value) =&gt; Temperature(value)
      case None        =&gt; TemperatureNotAvailable
    &#125;

    val deviceId = response.deviceId
    repliesSoFar += (deviceId -&gt; reading)
    stillWaiting -= deviceId

    respondWhenAllCollected()
  &#125;

  private def onDeviceTerminated(deviceId: String): Behavior[Command] = &#123;
    if (stillWaiting(deviceId)) &#123;
      repliesSoFar += (deviceId -&gt; DeviceNotAvailable)
      stillWaiting -= deviceId
    &#125;
    respondWhenAllCollected()
  &#125;

  private def onCollectionTimout(): Behavior[Command] = &#123;
    repliesSoFar ++= stillWaiting.map(deviceId =&gt; deviceId -&gt; DeviceTimedOut)
    stillWaiting = Set.empty
    respondWhenAllCollected()
  &#125;

  private def respondWhenAllCollected(): Behavior[Command] = &#123;
    if (stillWaiting.isEmpty) &#123;
      requester ! RespondAllTemperatures(requestId, repliesSoFar)
      Behaviors.stopped
    &#125; else &#123;
      this
    &#125;
  &#125;
&#125;
// --groupdevicequery

测试代码

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest._
import scala.concurrent.duration.DurationInt

class DeviceSpec extends ScalaTestWithActorTestKit with FlatSpecLike &#123;

  import Device._

  "Device actor" should "reply with empty reading if no temperature is known" in &#123;

    val recordProbe = createTestProbe[TemperatureRecorded]()
    val readProbe = createTestProbe[RespondTemperature]()
    val deviceActor = spawn(Device("group", "device"))

    deviceActor ! Device.RecordTemperature(requestId = 1, 24.0, recordProbe.ref)
    recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))

    deviceActor ! Device.ReadTemperature(requestId = 2, readProbe.ref)
    val response1 = readProbe.receiveMessage()
    response1.requestId should ===(2)
    response1.value should ===(Some(24.0))

    deviceActor ! Device.RecordTemperature(requestId = 3, 55.0, recordProbe.ref)
    recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 3))

    deviceActor ! Device.ReadTemperature(requestId = 4, readProbe.ref)
    val response2 = readProbe.receiveMessage()
    response2.requestId should ===(4)
    response2.value should ===(Some(55.0))
  &#125;
&#125;

class GroupSpec extends ScalaTestWithActorTestKit with FlatSpecLike&#123;
  import Device._
  import DeviceGroup._
  import DeviceManager._
  import DeviceGroupQuery._

  "group actor" should "be able to register a device actor" in &#123;
    val probe = createTestProbe[DeviceRegistered]()
    val groupActor = spawn(DeviceGroup("group"))

    groupActor ! RequestTrackDevice("group", "device1", probe.ref)
    val registered1 = probe.receiveMessage()
    val deviceActor1 = registered1.device

    // another deviceId
    groupActor ! RequestTrackDevice("group", "device2", probe.ref)
    val registered2 = probe.receiveMessage()
    val deviceActor2 = registered2.device
    deviceActor1 should !==(deviceActor2)

    // Check that the device actors are working
    val recordProbe = createTestProbe[TemperatureRecorded]()
    deviceActor1 ! RecordTemperature(requestId = 0, 1.0, recordProbe.ref)
    recordProbe.expectMessage(TemperatureRecorded(requestId = 0))
    deviceActor2 ! Device.RecordTemperature(requestId = 1, 2.0, recordProbe.ref)
    recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))
  &#125;

  "group actor" should "ignore requests for wrong groupId" in &#123;
    val probe = createTestProbe[DeviceRegistered]()
    val groupActor = spawn(DeviceGroup("group"))

    groupActor ! RequestTrackDevice("wrongGroup", "device1", probe.ref)
    probe.expectNoMessage(500.milliseconds)
  &#125;

  "group actor" should "return same actor for same deviceId" in &#123;
    val probe = createTestProbe[DeviceRegistered]()
    val groupActor = spawn(DeviceGroup("group"))

    groupActor ! RequestTrackDevice("group", "device1", probe.ref)
    val registered1 = probe.receiveMessage()

    // registering same again should be idempotent
    groupActor ! RequestTrackDevice("group", "device1", probe.ref)
    val registered2 = probe.receiveMessage()

    registered1.device should ===(registered2.device)
  &#125;


  "group actor" should "be able to list active devices" in &#123;
    val registeredProbe = createTestProbe[DeviceRegistered]()
    val groupActor = spawn(DeviceGroup("group"))

    groupActor ! RequestTrackDevice("group", "device1", registeredProbe.ref)
    registeredProbe.receiveMessage()

    groupActor ! RequestTrackDevice("group", "device2", registeredProbe.ref)
    registeredProbe.receiveMessage()

    val deviceListProbe = createTestProbe[ReplyDeviceList]()
    groupActor ! RequestDeviceList(requestId = 0, groupId = "group", deviceListProbe.ref)
    deviceListProbe.expectMessage(ReplyDeviceList(requestId = 0, Set("device1", "device2")))
  &#125;



  "group actor" should "be able to list active devices after one shuts down" in &#123;
    val registeredProbe = createTestProbe[DeviceRegistered]()
    val groupActor = spawn(DeviceGroup("group"))

    groupActor ! RequestTrackDevice("group", "device1", registeredProbe.ref)
    val registered1 = registeredProbe.receiveMessage()
    val toShutDown = registered1.device

    groupActor ! RequestTrackDevice("group", "device2", registeredProbe.ref)
    registeredProbe.receiveMessage()

    val deviceListProbe = createTestProbe[ReplyDeviceList]()
    groupActor ! RequestDeviceList(requestId = 0, groupId = "group", deviceListProbe.ref)
    deviceListProbe.expectMessage(ReplyDeviceList(requestId = 0, Set("device1", "device2")))

    toShutDown ! Passivate
    registeredProbe.expectTerminated(toShutDown, registeredProbe.remainingOrDefault)

    // using awaitAssert to retry because it might take longer for the groupActor
    // to see the Terminated, that order is undefined
    registeredProbe.awaitAssert &#123;
      groupActor ! RequestDeviceList(requestId = 1, groupId = "group", deviceListProbe.ref)
      deviceListProbe.expectMessage(ReplyDeviceList(requestId = 1, Set("device2")))
    &#125;
  &#125;

  "group query" should "return temperature value for working devices" in &#123;
    val requester = createTestProbe[RespondAllTemperatures]()

    val device1 = createTestProbe[Device.Command]()
    val device2 = createTestProbe[Device.Command]()

    val deviceIdToActor = Map("device1" -&gt; device1.ref, "device2" -&gt; device2.ref)

    val queryActor =
      spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))

    device1.expectMessageType[Device.ReadTemperature]
    device2.expectMessageType[Device.ReadTemperature]

    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(1.0)))
    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device2", Some(2.0)))

    requester.expectMessage(
      RespondAllTemperatures(
        requestId = 1,
        temperatures = Map("device1" -&gt; Temperature(1.0), "device2" -&gt; Temperature(2.0))))
  &#125;


  "group query" should "return TemperatureNotAvailable for devices with no readings" in &#123;
    val requester = createTestProbe[RespondAllTemperatures]()

    val device1 = createTestProbe[Device.Command]()
    val device2 = createTestProbe[Device.Command]()

    val deviceIdToActor = Map("device1" -&gt; device1.ref, "device2" -&gt; device2.ref)

    val queryActor =
      spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))

    device1.expectMessageType[Device.ReadTemperature]
    device2.expectMessageType[Device.ReadTemperature]

    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", None))
    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device2", Some(2.0)))

    requester.expectMessage(
      RespondAllTemperatures(
        requestId = 1,
        temperatures = Map("device1" -&gt; TemperatureNotAvailable, "device2" -&gt; Temperature(2.0))))
  &#125;


  "group query" should "return DeviceNotAvailable if device stops before answering" in &#123;
    val requester = createTestProbe[RespondAllTemperatures]()

    val device1 = createTestProbe[Device.Command]()
    val device2 = createTestProbe[Device.Command]()

    val deviceIdToActor = Map("device1" -&gt; device1.ref, "device2" -&gt; device2.ref)

    val queryActor =
      spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))

    device1.expectMessageType[Device.ReadTemperature]
    device2.expectMessageType[Device.ReadTemperature]

    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(2.0)))

    device2.stop()

    requester.expectMessage(
      RespondAllTemperatures(
        requestId = 1,
        temperatures = Map("device1" -&gt; Temperature(2.0), "device2" -&gt; DeviceNotAvailable)))
  &#125;

  "group query" should "return temperature reading even if device stops after answering" in &#123;
    val requester = createTestProbe[RespondAllTemperatures]()

    val device1 = createTestProbe[Device.Command]()
    val device2 = createTestProbe[Device.Command]()

    val deviceIdToActor = Map("device1" -&gt; device1.ref, "device2" -&gt; device2.ref)

    val queryActor =
      spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 3.seconds))

    device1.expectMessageType[Device.ReadTemperature]
    device2.expectMessageType[Device.ReadTemperature]

    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(1.0)))
    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device2", Some(2.0)))

    device2.stop()

    requester.expectMessage(
      RespondAllTemperatures(
        requestId = 1,
        temperatures = Map("device1" -&gt; Temperature(1.0), "device2" -&gt; Temperature(2.0))))
  &#125;


  "group query" should "return DeviceTimedOut if device does not answer in time" in &#123;
    val requester = createTestProbe[RespondAllTemperatures]()

    val device1 = createTestProbe[Device.Command]()
    val device2 = createTestProbe[Device.Command]()

    val deviceIdToActor = Map("device1" -&gt; device1.ref, "device2" -&gt; device2.ref)

    val queryActor =
      spawn(DeviceGroupQuery(deviceIdToActor, requestId = 1, requester = requester.ref, timeout = 200.millis))

    device1.expectMessageType[Device.ReadTemperature]
    device2.expectMessageType[Device.ReadTemperature]

    queryActor ! WrappedRespondTemperature(Device.RespondTemperature(requestId = 0, "device1", Some(1.0)))

    // no reply from device2

    requester.expectMessage(
      RespondAllTemperatures(
        requestId = 1,
        temperatures = Map("device1" -&gt; Temperature(1.0), "device2" -&gt; DeviceTimedOut)))
  &#125;



  "group query" should "be able to collect temperatures from all active devices" in &#123;
    val registeredProbe = createTestProbe[DeviceRegistered]()
    val groupActor = spawn(DeviceGroup("group"))

    groupActor ! RequestTrackDevice("group", "device1", registeredProbe.ref)
    val deviceActor1 = registeredProbe.receiveMessage().device

    groupActor ! RequestTrackDevice("group", "device2", registeredProbe.ref)
    val deviceActor2 = registeredProbe.receiveMessage().device

    groupActor ! RequestTrackDevice("group", "device3", registeredProbe.ref)
    registeredProbe.receiveMessage()

    // Check that the device actors are working
    val recordProbe = createTestProbe[TemperatureRecorded]()
    deviceActor1 ! RecordTemperature(requestId = 0, 1.0, recordProbe.ref)
    recordProbe.expectMessage(TemperatureRecorded(requestId = 0))
    deviceActor2 ! RecordTemperature(requestId = 1, 2.0, recordProbe.ref)
    recordProbe.expectMessage(TemperatureRecorded(requestId = 1))
    // No temperature for device3

    val allTempProbe = createTestProbe[RespondAllTemperatures]()
    groupActor ! RequestAllTemperatures(requestId = 0, groupId = "group", allTempProbe.ref)
    allTempProbe.expectMessage(
      RespondAllTemperatures(
        requestId = 0,
        temperatures =
          Map("device1" -&gt; Temperature(1.0), "device2" -&gt; Temperature(2.0), "device3" -&gt; TemperatureNotAvailable)))
  &#125;
&#125;

小结

  1. 官方给的例子使用了很多reactive design patterms,一个典型的pattern是request-response pattern 比如下面这段代码
final case class RequestDeviceList(requestId: Long, groupId: String, replyTo: ActorRef[ReplyDeviceList])
    extends DeviceManager.Command
      with DeviceGroup.Command

...........

case RequestDeviceList(requestId, gId, replyTo) =&gt;
        if (gId == groupId) &#123;
          replyTo ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
          this
        &#125; else
          Behaviors.unhandled

又比如

  final case class RequestAllTemperatures(requestId: Long, groupId: String, replyTo: ActorRef[RespondAllTemperatures])
    extends DeviceGroupQuery.Command
      with DeviceGroup.Command
      with DeviceManager.Command

......

      case RequestAllTemperatures(requestId, gId, replyTo) =&gt;
        if (gId == groupId) &#123;
          context.spawnAnonymous(
            DeviceGroupQuery(deviceIdToActor, requestId = requestId, requester = replyTo, 3.seconds))
          this
        &#125; else
          Behaviors.unhandled

总结为这样的形式

case class &lt;request&gt; (...., replyTo:&lt;respond&gt;)
  1. 并发编程模型除了actor模型,还有future promise,这也是Scala提供的并发框架

  2. 模式匹配使用频率非常多,需要牢固掌握


Share this post on:

Previous Post
推荐系统-基于FM模型的召回
Next Post
从多臂赌博机谈起