package com.datacrafts.digitalevers.link
/**
* 经典的约瑟夫环问题
* 本质就是一个单向循环链表的实现
* @param n 链表中初始节点个数
*/
class Josephus(n:Int) {
var length = 0
var head:node = null
var curr:node = null
var prev:node = null
for(index <- 1 to n){
if(length == 0){
curr = new node(index)
prev = curr
head = curr
curr.next = curr
} else {
val temp = new node(index)
temp.next = head
curr.next = temp
prev = curr
curr = temp
}
length += 1
}
//初始化完成后复位
curr = head
prev = head
/**
* 移动到第k个人
* 1 <= k <= length
*/
def moveToK(k:Int): Unit ={
for(index <- 1 until k){
prev = curr
curr = curr.next
}
}
/**
* 开始丢手绢过程
* 丢到第 m 个人则退出
* 若curr 与 prev相等 说明循环链表中只剩一个节点 则结束循环
*/
def circle(m:Int): Unit ={
while(curr != prev){
for(index <- 1 until m){
prev = curr
curr = curr.next
}
//循环结束 删除当前节点
println(curr.id)
prev.next = curr.next
curr = curr.next
length -= 1
}
println(curr.id)
}
/**
* 展示约瑟夫环上所有节点数据
*/
def show(): Unit = {
var curr = head
for(index <- 1 to length){
println(curr.id)
curr = curr.next
}
}
}
/**
* 节点类型
* @param _id
*/
class node(_id:Int){
var id:Int = _id
var next:node = null
}
object Josephus{
def main(args: Array[String]): Unit = {
var josephus = new Josephus(7)
//josephus.show()
josephus.moveToK(4)
//println(josephus.curr.id)
//println(josephus.prev.id)
josephus.circle(3)
//println(josephus.length)
}
}
package com.datacrafts.digitalevers.generic
import util.control.Breaks._
import scala.reflect.runtime.universe._
class twoWay[T,U](in:T,v:U) {
private var size = 0
var head:node[T,U] = new node[T,U](in,v)
//比较器
val cc = new commonCompare
/**
* 链表尾部添加节点
* @param newNode
*/
def addNode(newNode:node[T,U]): Unit = {
var temp = head
while(temp.next != null){
temp = temp.next
}
temp.next = newNode
newNode.prev = temp
size += 1
}
/**
* 按顺序插入节点
*/
def addNodeOrder(newNode:node[T,U])(implicit t:TypeTag[T]): Unit = {
var curr = head
var prev = head
t.tpe match {
case tpe if tpe == typeOf[Int]=>{
breakable {
while (curr != null) {
prev = curr
curr = curr.next
if (curr != null && cc.greater(curr.id.asInstanceOf[Int], newNode.id.asInstanceOf[Int]) == false) {
//如果找到了这个比新节点数据更小或相等的节点 跳出循环
break()
}
}
}
}
case tpe if tpe == typeOf[Double] =>{
breakable {
while (curr != null) {
prev = curr
curr = curr.next
if (curr != null && cc.greater(curr.id.asInstanceOf[Double], newNode.id.asInstanceOf[Double]) == false) {
//如果找到了这个比新节点数据更小或相等的节点 跳出循环
break()
}
}
}
}
}
/////match
if(curr == null){
//没有找到该节点则在链表尾部插入新节点
prev.next = newNode
newNode.prev = prev
} else {
//没有则在该节点前置插入新节点
newNode.next = curr
prev.next = newNode
curr.prev = newNode
newNode.prev = prev
}
size += 1
}
/**
* 更新节点数据域
*/
def updateNode(id:T,data:U){
var curr = head
while (curr != null && curr.id != id) {
curr = curr.next
}
if(curr != null){
curr.data = data
}
}
/**
* 删除节点(自我删除)
*/
def delete(id:T): Unit = {
/*var curr = head
var prev = head
while (curr != null && curr.id != id){
prev = curr
curr = curr.next
}
if(curr != null){
prev.next = curr.next
curr.next.prev = prev
curr.next = null
curr.prev = null
}*/
var curr = head
while (curr != null && curr.id != id){
curr = curr.next
}
if(curr != null){
curr.prev.next = curr.next
if(curr.next != null) {
curr.next.prev = curr.prev
}
//清除待删除节点自身与其他节点的关系
curr.next = null
curr.prev = null
}
}
/**
* 展示链表的所有数据
*/
def show(): Unit ={
var temp = head
while(temp.next != null){
temp = temp.next
println(temp)
}
}
def length = {
size
}
}
class node[T,U](in:T,v:U){
var id:T = in
var data:U = v
var prev:node[T,U] = null
var next:node[T,U] = null
override def toString: String = {
id + " "+data.toString
}
}
/**
* 泛型的通用比较类
*/
class commonCompare{
def greater[T<% Comparable[T]](t1:T,t2:T) =
if(t1.compareTo(t2) > 0) true else false
}
object testTwoWay{
def main(args: Array[String]): Unit = {
val linkTest = new twoWay(0,"")
val newNode = new node(13,"hello")
linkTest.addNodeOrder(newNode)
val newNode2 = new node(15,"scala")
linkTest.addNodeOrder(newNode2)
val newNode3 = new node(20,"java")
linkTest.addNodeOrder(newNode3)
//linkTest.updateNode(13,"php")
linkTest.delete(15)
linkTest.delete(13)
linkTest.addNodeOrder(newNode2)
linkTest.show()
//print(linkTest.length)
}
}
package com.datacrafts.digitalevers.generic
import util.control.Breaks._
import scala.reflect.runtime.universe._
/**
* 比较粗略实现了泛型版本的单向链表的增删改查操作,代码在scala 2.12.10上编译通过,测试运行通过。有几个点需要注意
* 1.上下边界 <: >:都没有隐式转换 只有视图边界 % 才有隐式转换
* 2.在按顺序往链表添加节点时 应将比较器的实例化对象提取到 链表类的属性层 而不应放在方法中 此举是为了解决内存使用
* 3.编译过程中,实例化比较器需要得知泛型参数的具体类型,之前在这个地方纠结了很久,最终无法通过隐式转换实行自动比较,故只能显式将其转换为Int 或者 Double类型
* 4.但是由于 scala 在运行过程中会擦除泛型类型,故使用了一个小技巧——加入一个隐式参数TypeTag[T]来保存运行时泛型类型,然后通过模式匹配来决定改转换成哪种对象(Int或者Double),然后会自动通过隐式转换为 Comparable 子类来进行比较操作
* 5.感觉还有更好的实现方式,以便可以支持更多的数据类型
* @param in
* @tparam T
*/
class genericClass[T,U](in:T,v:U) {
private var size = 0
var head:node[T,U] = new node[T,U](in,v)
//比较器
val cc = new commonCompare
/**
* 链表尾部添加节点
* @param newNode
*/
def addNode(newNode:node[T,U]): Unit = {
var temp = head
while(temp.next != null){
temp = temp.next
}
temp.next = newNode
size += 1
}
/**
* 按顺序插入节点
*/
def addNodeOrder(newNode:node[T,U])(implicit t:TypeTag[T]): Unit = {
var curr = head
var prev = head
t.tpe match {
case tpe if tpe == typeOf[Int]=>{
breakable {
while (curr != null) {
prev = curr
curr = curr.next
if (curr != null && cc.greater(curr.id.asInstanceOf[Int], newNode.id.asInstanceOf[Int]) == false) {
//如果找到了这个比新节点数据更小或相等的节点 跳出循环
break()
}
}
}
}
case tpe if tpe == typeOf[Double] =>{
breakable {
while (curr != null) {
prev = curr
curr = curr.next
if (curr != null && cc.greater(curr.id.asInstanceOf[Double], newNode.id.asInstanceOf[Double]) == false) {
//如果找到了这个比新节点数据更小或相等的节点 跳出循环
break()
}
}
}
}
}
/////match
if(curr == null){
//没有找到该节点则在链表尾部插入新节点
prev.next = newNode
} else {
//没有则在该节点前置插入新节点
newNode.next = curr
prev.next = newNode
}
size += 1
}
/**
* 更新节点数据域
*/
def updateNode(id:T,data:U){
var curr = head
while (curr != null && curr.id != id) {
curr = curr.next
}
if(curr != null){
curr.data = data
}
}
/**
* 删除节点
*/
def delete(id:T): Unit = {
var curr = head
var prev = head
while (curr != null && curr.id != id){
prev = curr
curr = curr.next
}
if(curr != null){
prev.next = curr.next
}
}
/**
* 展示链表的所有数据
*/
def show(): Unit ={
var temp = head
while(temp.next != null){
temp = temp.next
println(temp)
}
}
def length = {
size
}
}
class node[T,U](in:T,v:U){
var id:T = in
var data:U = v
var next:node[T,U] = null
override def toString: String = {
id + " "+data.toString
}
}
/**
* 泛型的通用比较类
*/
class commonCompare{
def greater[T<% Comparable[T]](t1:T,t2:T) =
if(t1.compareTo(t2) > 0) true else false
}
object test{
def main(args: Array[String]): Unit = {
val linkTest = new genericClass(0,"")
val newNode = new node(13,"hello")
linkTest.addNodeOrder(newNode)
val newNode2 = new node(15,"scala")
linkTest.addNodeOrder(newNode2)
linkTest.updateNode(13,"php")
//linkTest.delete(15)
linkTest.show()
//print(linkTest.length)
}
}
package com.datacrafts.digitalevers.queue
object oneWayQueueDemo{
def main(args: Array[String]): Unit = {
}
}
/**
*
*/
class oneWayQueue(maxSize:Int) {
var front = -1
var tail = -1
val length = maxSize
if(maxSize <= 0){
throw new Exception("参数错误")
}
var oneWayQueue = Array[Int](maxSize)
/**
* 队列是否为空
*/
def isEmpty(): Boolean = {
if(front == tail) true else false
}
/**
* 队列是否满员
*/
def isFull():Boolean = {
if(tail >= length - 1) true else false
}
/**
* 队列弹出数据
*/
def getData() = {
if(isEmpty()){
throw new Exception("队列为空")
}
front += 1
oneWayQueue(front)
}
/**
* 队列添加数据
* TODO 亦可以直接返回 Exception 类型使其参与逻辑运算
*/
def addData(inData:Int) = {
if(isFull()){
//throw new Exception("队列已满")
false
} else {
tail += 1
oneWayQueue(tail) = inData
true
}
}
////////
}
package com.datacrafts.digitalevers.curry
import java.io.{BufferedReader, Closeable, File, FileReader}
import com.esotericsoftware.minlog.Log
/**
* 这段代码使用函数柯里化的功能实现了读取文件源并将其转为List数据
* 函数柯里化的精髓是使得函数实现在依赖其他函数的情况下实现可插拔,从而在函数与函数之间的调用上进行解耦
* 比如这段代码可以在getLines底层函数上只保留 filename 一个参数,在函数体内直接调用 isReadable 和 closeStream方法来进行
* 周边逻辑的实现,一样可以达到效果,但是如果将来需要修改 isReadable 和 closeStream 的功能,不得不修改这两个方法的源代码
* 如果采用柯里化的形式 可以重新编写一个 isReadable 和 closeStream 的实现,在上层调用的时候直接将新函数作为参数传入即可
* 从而符合ocp原则。
*/
object FileReader {
/**
* 柯里化后的函数
* @param filename
* @param isFileReadable
* @param closableStream
* @return
*/
def getLines (filename: String) (isFileReadable: (File) => Boolean) (closableStream: (Closeable) => Unit): String = {
val file = new File(filename)
if (isFileReadable (file) ) {
val readerStream = new FileReader(file)
val buffer = new BufferedReader(readerStream)
try {
var content = ""
var str = ""
var isReadOver = false
while (! isReadOver) {
str = buffer.readLine ()
if (str == null) {
isReadOver = true
} else {
content += str
}
}
content
} finally {
closableStream (buffer)
closableStream (readerStream)
}
} else {
""
}
}
/**
* 判断文件是否可读
* @param file
* @return
*/
def isReadable (file: File) = {
if (null != file && file.exists() && file.canRead() ) {
true
} else {
false
}
}
/**
*关闭打开的文件流
* @param stream
*/
def closeStream (stream: Closeable) {
if (null != stream) {
try {
stream.close
} catch {
case ex:Throwable => Log.error( "[" +this.getClass.getName + ".closeStream] ", ex.getMessage)
}
}
}
}
package com.datacrafts.digitalevers.sparseMatrix
import java.io.FileWriter
import com.datacrafts.digitalevers.curry.FileReader.{closeStream, getLines, isReadable}
import com.google.gson.Gson
import scala.collection.mutable.ArrayBuffer
/**
* 此例程提供Scala对稀疏矩阵进行压缩解压模拟的操作方法
* 通用方式
*/
object normal{
implicit val filePath = ""
def main(args: Array[String]): Unit = {
/*var test = Array.ofDim[Int](11,3)
test(5)(2) = 11
test(3)(1) = 5
var resJson = zipMatrix(test)(checkMatrix)("json.txt")*/
val arr = unzipMatrix("json.txt")
}
/**
* 压缩稀疏矩阵并将其保存到文件中
*/
def zipMatrix(inArray:Array[Array[Int]])(check:(Array[Array[Int]])=>Array[Int])(implicit filePath:String) = {
val newMatrix = ArrayBuffer[Array[Int]]()
val res:Array[Int] = check(inArray)
newMatrix.append(Array(res(0),res(1),0))
for(i <- 0 until inArray.length){
for(j <- 0 until inArray(i).length){
if(inArray(i)(j) != 0){
newMatrix.append(Array(i,j,inArray(i)(j)))
}
}
}
//print(filePath)
val arr = newMatrix.toArray //ArrayBuffer -> Array -> json
val gson = new Gson()
val json = gson.toJson(arr)
if(filePath.isEmpty == true){
json
} else {
save2File(filePath,json)
}
}
/**
* 从文件中读取数据并解压成稀疏矩阵
*/
def unzipMatrix(filePath:String) = {
val content = getLines(filePath) (isReadable) (closeStream)
if(content == ""){
throw new Exception("文件内容为空")
}
val res = ArrayBuffer[Array[Int]]()
val gson = new Gson()
val arr = gson.fromJson(content,classOf[Array[Array[Int]]])
for(i <- 0 until arr.length){
if(i == 0){
for(j <- 0 until arr(i)(0)){
res.append(Array.ofDim[Int](arr(i)(1)))
}
} else {
res(arr(i)(0))(arr(i)(1)) = arr(i)(2)
}
}
res.toArray
}
/**
* 检测数组的容积(长和宽)
* @param inArray
* @return
*/
def checkMatrix(inArray:Array[Array[Int]]):Array[Int] = {
if(inArray.length > 0){
val i = inArray.length
val j = inArray(0).length
Array(i,j)
} else {
Array(0,0)
}
}
/**
* 保存字符串到文件中
* @param filePath 文件路径
* @param content 需要保存的字符内容
* @return 保证成功 true 保存失败 false
*/
def save2File(filePath:String,content:String): Boolean = {
try {
val out = new FileWriter(filePath, true)
out.write(content)
out.close()
true
} catch {
case ex: Throwable => {
false
}
}
}
////////////////
}
下面是泛型方式,在开发泛型方式的时候有一个小插曲
最开始使用Play Framework框架中的json解析库(play.api.libs.json)无法解析泛型的数组结构,更换到gson解析库,便可正常解析
package com.datacrafts.digitalevers.sparseMatrix
import com.datacrafts.digitalevers.curry.FileReader.{closeStream, getLines, isReadable}
import com.datacrafts.digitalevers.sparseMatrix.normal.{save2File, unzipMatrix}
import com.google.gson.Gson
import scala.collection.mutable.ArrayBuffer
/**
* 此例程提供Scala对稀疏矩阵进行压缩解压模拟的操作方法
* 泛型方式
*/
object generic{
implicit val filePath = ""
def main(args: Array[String]): Unit = {
var test = Array.ofDim[Double](11,3)
test(5)(2) = 11.1
test(3)(1) = 5.2
var res = zipMatrix(test)(checkMatrix)("json_generic.txt")
val arr = unzipMatrix("json_generic.txt")
for(line <- arr){
for(elem <- line){
print(elem.toString + " ")
}
println()
}
}
/**
* 压缩稀疏矩阵并将其保存到文件中
*/
def zipMatrix[T](inArray:Array[Array[T]])(check:(Array[Array[T]])=>Array[Int])(implicit filePath:String) = {
val newMatrix = ArrayBuffer[Array[Any]]()
val res:Array[Int] = check(inArray)
newMatrix.append(Array(res(0),res(1),0))
for(i <- 0 until inArray.length){
for(j <- 0 until inArray(i).length){
if(inArray(i)(j) != 0){
newMatrix.append(Array(i,j,inArray(i)(j)))
}
}
}
val arr = newMatrix.toArray
val gson = new Gson()
val json = gson.toJson(arr)
if(filePath.isEmpty == true){
json
} else {
save2File(filePath,json)
}
}
/**
* 因为泛型的压缩数组数据类型不确定
* 所以这个隐式函数提供给 unzipMatrix 作隐式类型转换
* @param in
* @return
*/
implicit def any2Int(in:Any):Int = {
in.toString.toDouble.toInt
}
/**
* 从文件中读取数据并解压成稀疏矩阵
*/
def unzipMatrix(filePath:String) = {
val content = getLines(filePath) (isReadable) (closeStream)
if(content == ""){
throw new Exception("文件内容为空")
}
val res = ArrayBuffer[Array[Any]]()
val gson = new Gson()
val arr = gson.fromJson(content,classOf[Array[Array[Any]]])
for(i <- 0 until arr.length){
if(i == 0){
for(j <- 0 until arr(0)(0)){
val line = ArrayBuffer[Any]()
for(k <- 0 until arr(0)(1)){
line.append(0)
}
res.append(line.toArray)
}
///初始化
} else {
res(arr(i)(0))(arr(i)(1)) = arr(i)(2)
}
}
res.toArray
}
/**
* 检测数组的容积(长和宽)
* @param inArray
* @tparam T
* @return
*/
def checkMatrix[T](inArray:Array[Array[T]]):Array[Int] = {
if(inArray.length > 0){
val i = inArray.length
val j = inArray(0).length
Array(i,j)
} else {
Array(0,0)
}
}
}
package com.datacrafts.digitalevers.curry
import java.io.{BufferedReader, Closeable, File, FileReader}
import com.esotericsoftware.minlog.Log
/**
* 这段代码使用函数柯里化的功能实现了读取文件源并将其转为List数据
*
* 函数柯里化的精髓是使得函数在依赖其他函数的情况下实现可插拔,从而在函数与函数之间的调用上进行解耦
* 比如这段代码也可以在getLines底层函数上只保留 filename 一个参数,在函数体内直接调用 isReadable 和 closeStream 方法来进行
* 周边逻辑的实现,可以达到同样的效果,但是如果将来需要修改 isReadable 和 closeStream 的功能,不得不修改这两个方法的源代码
* 如果采用柯里化的形式 可以重新编写一个 isReadable 和 closeStream 的实现,在上层调用的时候直接将新函数作为参数传入即可
* 从而符合ocp原则。
*/
object File2List {
def main(args: Array[String]): Unit = {
var list = readFile("1.txt")
print(list)
}
def readFile (filename: String): List[String] = {
getLines(filename) (isReadable) (closeStream)
}
/**
* 柯里化后的函数
* @param filename
* @param isFileReadable
* @param closableStream
* @return
*/
private def getLines (filename: String) (isFileReadable: (File) => Boolean) (closableStream: (Closeable) => Unit): List[String] = {
val file = new File(filename)
if (isFileReadable (file) ) {
val readerStream = new FileReader(file)
val buffer = new BufferedReader(readerStream)
try {
var list: List[String] = List ()
var str = ""
var isReadOver = false
while (! isReadOver) {
str = buffer.readLine ()
if (str == null) isReadOver = true
else list = str :: list
}
list.reverse
} finally {
closableStream (buffer)
closableStream (readerStream)
}
} else {
List ()
}
}
/**
* 判断文件是否可读
* @param file
* @return
*/
def isReadable (file: File) = {
if (null != file && file.exists() && file.canRead() ) {
true
} else {
false
}
}
/**
*关闭打开的文件流
* @param stream
*/
def closeStream (stream: Closeable) {
if (null != stream) {
try {
stream.close
} catch {
case ex => Log.error( "[" +this.getClass.getName + ".closeStream] ", ex.getMessage)
}
}
}
}
无论实例对象的类型被定义成基类类型还是子类类型,都优先执行实例化目标类中的方法,若该类中无此方法,则往上寻找去执行基类的该方法
class Base{
def show():Unit = {
printf("Base")
}
}
class Sub extends Base{
override def show(): Unit = {
printf("Sub")
}
}
object Test {
def main(args: Array[String]): Unit = {
val sub:Sub = new Sub
sub.show()
}
}
上述代码基类和子类分别有一个show方法,子类对其进行了重写,最后的输出结果是打印了 Sub 字符串,可见Sub子类中的show方法得到了执行。关键的来了,即使 val sub:Sub = new Sub 这行改为了
var sub:Base = new Sub
最后打印结果依旧没变,可见印证了上述结论
然后对该段代码稍作修改,注释掉子类的 show 方法,再看一下结果
class Base{
def show():Unit = {
printf("Base")
}
}
class Sub extends Base{
/*override def show(): Unit = {
printf("Sub")
}*/
}
object Test {
def main(args: Array[String]): Unit = {
val sub:Sub = new Sub
sub.show()
}
}
这是发现打印结果变成了 Sub 字符串,这是因为子类中的确是没有该方法了,不得已去基类查找,最后执行基类的show方法
若基类子类的属性不存在覆盖行为,则方法对属性的获取是就近原则,若存在覆盖行为,则方法永远获取子类的属性
不存在覆盖的行为指属性都由 private 修饰,就近原则指基类的方法获取基类的属性,子类的方法获取子类的属性
class Base{
private val name = "Base"
def show():Unit = {
printf(this.name)
}
}
class Sub extends Base{
private val name = "Sub"
}
object Test {
def main(args: Array[String]): Unit = {
val sub:Base = new Sub
sub.show()
}
}
上述代码执行基类的show方法,由于name属性都被private修饰,不存在覆盖,采取就近原则,最后打印结果是基类的name值——”Base”
class Base{
val name = "Base"
def show():Unit = {
printf(this.name)
}
}
class Sub extends Base{
override val name = "Sub"
}
object Test {
def main(args: Array[String]): Unit = {
val sub:Base = new Sub
sub.show()
}
}
然后对其进行改写,使得name属性被覆写,这时发现虽然执行的还是基类的show方法,但是打印的是子类的name值——”Sub”,印证了这一结论
技术环境:Scala2.12.10 因为各版本间可能存在差异,所以其他版本的表现只能由读者自己去验证了,此只作为该Scala版本的验证依据
Scala的继承类构造函数的执行顺序永远遵循下面两个原则
无论入口在哪,首先执行主构造器,再执行辅助构造器
若存在继承关系,则首先执行基类的构造器,再执行子类的构造器
下面的代码可以验证这两个原则
package com.datacrafts.digitalevers.test
object Test {
def main(args: Array[String]): Unit = {
val test = new Sub(123,"zhangsan")
}
}
class Base(){
println("Base主构造器执行完毕~")
def this(inAge:Int){
this
println("Base第一个辅助构造器执行完毕~")
}
}
class Sub(inAge:Int) extends Base(inAge:Int){
var name:String = ""
var age:Int = 10
println("Sub主构造器执行完毕~")
def this(name:String) = {
this(10)
this.name = name
println("Sub第一个辅助构造器执行完毕~")
}
def this(age:Int,name:String) = {
this(age)
this.age = age
this.name = name
println("Sub第二个辅助构造器执行完毕~")
}
}
最后的输出结果
Base主构造器执行完毕~
Base第一个辅助构造器执行完毕~
Sub主构造器执行完毕~
Sub第二个辅助构造器执行完毕~
这里补充两个注意事项
Scala类的主构造器和辅助构造器之间及辅助构造器与辅助构造器之间都不能存在雷同的参数样式。这里的雷同是指参数个数相同并且参数的类型也一致,因为只要这样的雷同函数发生,Scala的类在实例化的时候就会很迷惑,不知道该何去何从,该执行哪个构造器
如果类之间存在继承关系,既不能在子类的辅助构造器内也不能子类的主构造器内显式调用super来执行基类的构造器,只能由子类的主构造器隐式调用。而基类构造器需要的参数也需要在子类的主构造器上传递过来,如下形式
class Sub(inAge:Int) extends Base(inAge:Int){
………..
}
技术环境:CentOS6.5 + kafka-manager2.0 + kafka_2.12-2.5.0
技术背景:为了方便对kafka内部数据进行监控和管理,便找到了kafka-manager,这次使用的是2.0版本,其最新github版本已升至3.0.0.5
下载和安装
kafka-manager由yahoo开源,可以到yahoo的github主页下载已经编译好的二进制以及未编译的源代码
下面是主页地址
https://github.com/yahoo/CMAK/releases
直接可以下载运行的二进制文件
https://github.com/yahoo/CMAK/releases/download/3.0.0.5/cmak-3.0.0.5.zip
需要编译的源代码
https://github.com/yahoo/CMAK/archive/3.0.0.5.zip
如果使用源码,那会是一个漫长的编译过程,整个流程另起篇幅讲解,这里不再讲述,这里只讲解二进制版本
二进制版本可以直接运行,首先需要配置 bin/application.conf 文件,这个配置比较简单,只需要配置一行获取到zookeeper的位置就行了,如下所示
kafka-manager.zkhosts=”localhost:2181″
#kafka-manager.zkhosts=${?ZK_HOSTS}
找到kafka-manager.zkhosts=${?ZK_HOSTS}这一行,注释掉,然后添加一行kafka-manager.zkhosts=”localhost:2181″
因为只有一个kafka节点,使用的是其自带的zookeeper,所以是localhost:2181,如果有一个集群,可以在后面追加。比如
kafka-manager.zkhosts=”ip1:2181,ip2:2181″
也可以通过直接使用ZK_HOSTS环境变量来进行配置,比如
讲ZK_HOSTS=”my.zookeeper.host.com:2181″添加至.bashrc文件中
两种风格看个人喜好,皆可行
配置好后直接启动(保证zookeeper和kafka的服务正常)
bin/kafka-manager -Dconfig.file=conf/application.conf
kafka-manager的默认访问端口是9000,如果不是9000,也可以在启动时通过-Dhttp.port参数来指定具体的端口号,如下所示,便将其修改为8080
bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8080 &
正常启动后,通过浏览器访问 http://localhost:8080/ 即可看到kafka-manger的界面
近期评论