Spark的流式计算经常需要模拟流数据,这时就需要使用定时器每隔一秒或每隔几秒向Kafka写入模拟数据,模拟数据的方法及发送策略的制定可以参考这一篇,大体思想就是设置一天模拟的总数据量,然后使用一定的算法分解到每一秒中,再把分解好的数字序列存放到一个大的数组中,最后使用定时器从头到尾扫描这个数组,逐一取出再组装相应的模拟数据量就是了。
可以使用原生 Java 的 Timer 定时调度工具来实现这个定时器,但是这里采用一种另类的方式来实现——采用Akka的actor来实现。整体来说比较简单,直接上代码
package com.datacrafts.digitalevers.simulationClick
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.datacrafts.digitalevers.simulationClick.common._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration
import scala.concurrent.duration.FiniteDuration
class sendClick extends Actor{
override def receive: Receive = {
case "start" =>{
println("客户端启动")
import context.dispatcher
context.system.scheduler.scheduleWithFixedDelay(FiniteDuration(0,duration.MILLISECONDS),FiniteDuration(1000,duration.MILLISECONDS), self, sendData2Kafka)
}
case sendData2Kafka=>{
println("客户端向Kafka发送数据~")
}
}
}
object sendClick {
def main(args: Array[String]): Unit = {
val host = "127.0.0.1"
val port = 10006
val config = ConfigFactory.parseString(s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.actor.allow-java-serialization = on
|akka.remote.artery.canonical.hostname=$host
|akka.remote.artery.canonical.port=$port""".stripMargin)
val actorSystem = ActorSystem("work",config)
val sparkWorkRef: ActorRef = actorSystem.actorOf(Props(new sendClick),"sparkWork")
sparkWorkRef ! "start"
}
}
这里采用的是 Akka2.6.8,其他版本的配置参数可能会有些许不同,读者可自行查阅文档。host 和 port 可配可不配,这里还是配置上了,如果不配置的话,Akka会默认当前主机,同时随机打开一个端口供 actor 使用。
如代码所见,这个例程去掉了 Akka 与服务端的交互,而 Akka 的一般应用场景就是与服务端的交互。这里只利用到 actor 对自身发送消息这一机制
当actor启动后,便启动定时器每隔一秒向自身发送一个 sendData2Kafka 对象消息,这个空消息对象定义如下
package com.datacrafts.digitalevers.simulationClick.common
case class sendData2Kafka(){}
当 actor 接受到自身的这个对象消息后,便会执行 case sendData2Kafka 内的代码,这里便可以实现向 Kafka 写数据的逻辑。
以上便是整个流程,如此简单。
近期评论