使用JMX监控Kafka

news/2024/7/8 1:46:11

监控数据源

JMX RMI方式启动Broker,Consumer,Producer

-ea -Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9996

通过JMX RMI方式连接

service:jmx:rmi:///jndi/rmi://127.0.0.1:9998/jmxrmi

监控数据

broker

bean name: kafka:type=kafka.SocketServerStats(每次启动都会清空这部分数据)

复制代码
def getProduceRequestsPerSecond: Double
def getFetchRequestsPerSecond: Double
def getAvgProduceRequestMs: Double
def getMaxProduceRequestMs: Double
def getAvgFetchRequestMs: Double
def getMaxFetchRequestMs: Double
def getBytesReadPerSecond: Double
def getBytesWrittenPerSecond: Double
def getNumFetchRequests: Long
def getNumProduceRequests: Long
def getTotalBytesRead: Long
def getTotalBytesWritten: Long
def getTotalFetchRequestMs: Long
def getTotalProduceRequestMs: Long
复制代码

bean name: kafka:type=kafka.BrokerAllTopicStat(每次启动都会清空这部分数据)
bean name: kafka:type=kafka.BrokerTopicStat.topic(每次启动都会清空这部分数据)

def getMessagesIn: Long  写入消息的数量
def getBytesIn: Long   写入的byte数量
def getBytesOut: Long   读出byte的数量
def getFailedProduceRequest: Long   失败的生产数量
def getFailedFetchRequest: Long  失败的读取操作数量

不是太重要的属性

bean name: kafka:type=kafka.LogFlushStats

def getFlushesPerSecond: Double
def getAvgFlushMs: Double
def getTotalFlushMs: Long
def getMaxFlushMs: Double
def getNumFlushes: Long

bean name: kafka:type=logs.topic-pattern

def getName: String    监控项目的名字,格式  topic+”-”+分区ID,比如 guoguo_t_1-0,guoguo_t_1-1
def getSize: Long 执久化文件的大小
def getNumberOfSegments: Int  执久化文件的数量
def getCurrentOffset: Long   基于当前写入kafka的文件的byte偏移量
def getNumAppendedMessages: Long    追加数据,每次重启清空

其它的需要监控的数据项目:

java堆(堆的内存使用情况,非堆的内存使用情况等)
GC信息(GC次数,GC总时间等)

consumer


消费者的状态
bean name: kafka:type=kafka.ConsumerStats

复制代码
def getPartOwnerStats: String
比如:guoguo_t_1: [
  {
       0-1,  //  broker+分区的信息
       fetchoffset: 58246,  取的offset,已经消费的offset
      consumeroffset: 58246
   }{ 0-0,  fetchoffset: 2138747,consumeroffset: 2138747}]
def getConsumerGroup: String    消费者的组,比如guoguo_group_1
def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long  有多少byte消息没有读取
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long 已经消费了多少byte的数据
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
复制代码

bean name: kafka:type=kafka.ConsumerAllTopicStat (所有topic的数据的汇总,重启数据也会被清空)

kafka:type=kafka.ConsumerTopicStat.topic(重启数据也会被清空)

def getMessagesPerTopic: Long
def getBytesPerTopic: Long

bean name: kafka:type=kafka.SimpleConsumerStats

def getFetchRequestsPerSecond: Double 每秒种发起的取数据请求数
def getAvgFetchRequestMs: Double 平均取数据请求用的ms数
def getMaxFetchRequestMs: Double 最大取数据请求用的ms数
def getNumFetchRequests: Long 取数据请求的数量
def getConsumerThroughput: Double 消费者的吞吐量,字节每秒

Producer

bean name: kafka:type=kafka.KafkaProducerStats

def getProduceRequestsPerSecond: Double
def getAvgProduceRequestMs: Double
def getMaxProduceRequestMs: Double
def getNumProduceRequests: Long

bean name: kafka.producer.Producer:type=AsyncProducerStats

def getAsyncProducerEvents: Int (发出消息数量,与所有消费者的getMessagesPerTopic值相关不应太大)
def getAsyncProducerDroppedEvents: Int

Demo程序

复制代码
package com.campaign.kafka
 
import javax.management._
import kafka.log.LogStatsMBean
import kafka.network.SocketServerStatsMBean
import kafka.server.BrokerTopicStatMBean
import javax.management.openmbean.CompositeData
import java.lang.management.{MemoryUsage, GarbageCollectorMXBean}
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
 
 
/**
 * Created by jiaguotian on 14-1-13.
 */
15object RmiMonitor {
  def main(args: Array[String]) {
    val jmxUrl: JMXServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://127.0.0.1:9999/jmxrmi")
   val connector: JMXConnector = JMXConnectorFactory.connect(jmxUrl)
    val mBeanServerconnection: MBeanServerConnection = connector.getMBeanServerConnection

   val domains: Array[String] = mBeanServerconnection.getDomains
    println("domains:")
   for (domain <- domains) {
     println("%25s:  %s".format("domain", domain))
    }

    println("-------------------------------")
   val beanSet: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(null, null)
   val beans: Array[ObjectInstance] = beanSet.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
    for (instance <- beans) {
     val objectName: ObjectName = instance.getObjectName
      println("%s %s".format(instance.getClassName, objectName))
    }

    println("-------------------------------")

  {
    val instance: ObjectName = ObjectName.getInstance("kafka:type=kafka.SocketServerStats")
     val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection,
       instance,
       classOf[SocketServerStatsMBean],
        true)
   println(instance.getCanonicalKeyPropertyListString)
     println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
     println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
     println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
     println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
     println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
     println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
     println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
     println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
     println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
     println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
    }
     println("-------------------------------");
    {
       val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
       ObjectName.getInstance("java.lang:type=Memory*"), null)
       val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
       for (name <- array) {
         val info: _root_.javax.management.MBeanInfo = mBeanServerconnection.getMBeanInfo(name)
         val attrInfos: Array[_root_.javax.management.MBeanAttributeInfo] = info.getAttributes
         println(name.toString)
         for (info <- attrInfos) {
           println(info.getName + " " + info.getType)
           info.getType match {
           case "javax.management.openmbean.CompositeData" =>
             val attribute: AnyRef = mBeanServerconnection.getAttribute(name, info.getName)
             val bean: MemoryUsage = MemoryUsage.from(attribute.asInstanceOf[CompositeData])
             println("%25s:  %s".format("Committed", bean.getCommitted()))
             println("%25s:  %s".format("Init", bean.getInit()))
             println("%25s:  %s".format("Max", bean.getMax()))
             println("%25s:  %s".format("Used", bean.getUsed()))
             case _ =>
          }
       }
     }
   }
   println("-------------------------------")

    {
     val objNames: java.util.Set[ObjectName] = mBeanServerconnection.queryNames(
        ObjectName.getInstance("java.lang:type=GarbageCollector,name=*"), null)
     val array: Array[ObjectName] = objNames.toArray(new Array[ObjectName](0))
     for (next <- array) {
        val bean: GarbageCollectorMXBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, next, classOf[GarbageCollectorMXBean], true)
        println("%25s:  %s".format("Name", bean.getName()))
        println("%25s:  %s".format("MemoryPoolNames", bean.getMemoryPoolNames()))
        println("%25s:  %s".format("ObjectName", bean.getObjectName()))
        println("%25s:  %s".format("Class", bean.getClass()))
        println("%25s:  %s".format("CollectionCount", bean.getCollectionCount()))
        println("%25s:  %s".format("CollectionTime", bean.getCollectionTime()))
    }
  }


   val TypeValuePattern = "(.*):(.*)=(.*)".r
   val kafka1: ObjectName = new ObjectName("kafka", "type", "*")
   val kafka: java.util.Set[ObjectInstance] = mBeanServerconnection.queryMBeans(kafka1, null)
   val kafkas: Array[ObjectInstance] = kafka.toArray(new Array[ObjectInstance](0)).sortWith((o1, o2) => o1.getClassName.compare(o2.getClassName) < 0)
     for (instance <- kafkas) {
       val objectName: ObjectName = instance.getObjectName
      println(instance.getClassName + " " + objectName)

      objectName.getCanonicalName match {
        case TypeValuePattern(domain, t, v) =>
          instance.getClassName match {
            case "kafka.log.LogStats" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
            println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
            println("%25s:  %s".format("Name", bean.getName()))
            println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
            println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
            println("%25s:  %s".format("Size", bean.getSize()))
            case "kafka.message.LogFlushStats" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: LogStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[LogStatsMBean], true)
            println("%25s:  %s".format("CurrentOffset", bean.getCurrentOffset))
            println("%25s:  %s".format("Name", bean.getName()))
            println("%25s:  %s".format("NumAppendedMessages", bean.getNumAppendedMessages))
            println("%25s:  %s".format("NumberOfSegments", bean.getNumberOfSegments))
            println("%25s:  %s".format("Size", bean.getSize()))
            case "kafka.network.SocketServerStats" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: SocketServerStatsMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[SocketServerStatsMBean], true)
            println("%25s:  %s".format("BytesReadPerSecond", bean.getBytesReadPerSecond))
            println("%25s:  %s".format("AvgFetchRequestMs", bean.getAvgFetchRequestMs))
            println("%25s:  %s".format("AvgProduceRequestMs", bean.getAvgProduceRequestMs))
            println("%25s:  %s".format("BytesWrittenPerSecond", bean.getBytesWrittenPerSecond))
            println("%25s:  %s".format("FetchRequestsPerSecond", bean.getFetchRequestsPerSecond))
            println("%25s:  %s".format("MaxFetchRequestMs", bean.getMaxFetchRequestMs))
            println("%25s:  %s".format("MaxProduceRequestMs", bean.getMaxProduceRequestMs))
            println("%25s:  %s".format("NumFetchRequests", bean.getNumFetchRequests))
            println("%25s:  %s".format("NumProduceRequests", bean.getNumProduceRequests))
            println("%25s:  %s".format("ProduceRequestsPerSecond", bean.getProduceRequestsPerSecond))
            println("%25s:  %s".format("TotalBytesRead", bean.getTotalBytesRead))
            case "kafka.server.BrokerTopicStat" =>
            val oName: ObjectName = new ObjectName(domain, t, v)
            val bean: BrokerTopicStatMBean = MBeanServerInvocationHandler.newProxyInstance(mBeanServerconnection, oName, classOf[BrokerTopicStatMBean], true)
            println("%25s:  %s".format("BytesIn", bean.getBytesIn))
            println("%25s:  %s".format("BytesOut", bean.getBytesOut))
            println("%25s:  %s".format("FailedFetchRequest", bean.getFailedFetchRequest))
            println("%25s:  %s".format("FailedProduceRequest", bean.getFailedProduceRequest))
            println("%25s:  %s".format("MessagesIn", bean.getMessagesIn))
            case _ =>
          }
        case _ =>
     }
    }
  }
}
复制代码

输出结果

复制代码
domains:
                   domain:  JMImplementation
                   domain:  com.sun.management
                   domain:  kafka
                   domain:  java.nio
                   domain:  java.lang
                   domain:  java.util.logging
-------------------------------
com.sun.management.UnixOperatingSystem java.lang:type=OperatingSystem
javax.management.MBeanServerDelegate JMImplementation:type=MBeanServerDelegate
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0
kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats
kafka.utils.Log4jController kafka:type=kafka.Log4jController
sun.management.ClassLoadingImpl java.lang:type=ClassLoading
sun.management.CompilationImpl java.lang:type=Compilation
sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ConcurrentMarkSweep
sun.management.GarbageCollectorImpl java.lang:type=GarbageCollector,name=ParNew
sun.management.HotSpotDiagnostic com.sun.management:type=HotSpotDiagnostic
sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=direct
sun.management.ManagementFactoryHelper$1 java.nio:type=BufferPool,name=mapped
sun.management.ManagementFactoryHelper$PlatformLoggingImpl java.util.logging:type=Logging
sun.management.MemoryImpl java.lang:type=Memory
sun.management.MemoryManagerImpl java.lang:type=MemoryManager,name=CodeCacheManager
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Survivor Space
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Perm Gen
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Par Eden Space
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=Code Cache
sun.management.MemoryPoolImpl java.lang:type=MemoryPool,name=CMS Old Gen
sun.management.RuntimeImpl java.lang:type=Runtime
sun.management.ThreadImpl java.lang:type=Threading
-------------------------------
type=kafka.SocketServerStats
     getAvgFetchRequestMs:  0.0
   getAvgProduceRequestMs:  0.0
    getBytesReadPerSecond:  0.0
 getBytesWrittenPerSecond:  0.0
getFetchRequestsPerSecond:  -0.0
     getMaxFetchRequestMs:  0.0
   getMaxProduceRequestMs:  0.0
      getNumFetchRequests:  0
    getNumProduceRequests:  0
getProduceRequestsPerSecond:  -0.0
-------------------------------
java.lang:type=Memory
HeapMemoryUsage javax.management.openmbean.CompositeData
             getCommitted:  3194421248
                  getInit:  3221225472
                   getMax:  3194421248
                  getUsed:  163302248
NonHeapMemoryUsage javax.management.openmbean.CompositeData
             getCommitted:  24313856
                  getInit:  24313856
                   getMax:  136314880
                  getUsed:  14854816
ObjectPendingFinalizationCount int
Verbose boolean
ObjectName javax.management.ObjectName
-------------------------------
                  getName:  ParNew
       getMemoryPoolNames:  [Ljava.lang.String;@23652209
            getObjectName:  java.lang:type=GarbageCollector,name=ParNew
                 getClass:  class com.sun.proxy.$Proxy1
       getCollectionCount:  0
        getCollectionTime:  0
                  getName:  ConcurrentMarkSweep
       getMemoryPoolNames:  [Ljava.lang.String;@2c61bbb7
            getObjectName:  java.lang:type=GarbageCollector,name=ConcurrentMarkSweep
                 getClass:  class com.sun.proxy.$Proxy1
       getCollectionCount:  0
        getCollectionTime:  0
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-1
            CurrentOffset:  5519897
                     Name:  guoguo_t_1-1
      NumAppendedMessages:  0
         NumberOfSegments:  1
                     Size:  5519897
kafka.log.LogStats kafka:type=kafka.logs.guoguo_t_1-0
            CurrentOffset:  7600338
                     Name:  guoguo_t_1-0
      NumAppendedMessages:  0
         NumberOfSegments:  1
                     Size:  7600338
kafka.network.SocketServerStats kafka:type=kafka.SocketServerStats
       BytesReadPerSecond:  0.0
        AvgFetchRequestMs:  0.0
      AvgProduceRequestMs:  0.0
    BytesWrittenPerSecond:  0.0
   FetchRequestsPerSecond:  -0.0
        MaxFetchRequestMs:  0.0
      MaxProduceRequestMs:  0.0
         NumFetchRequests:  0
       NumProduceRequests:  0
 ProduceRequestsPerSecond:  -0.0
           TotalBytesRead:  0
kafka.utils.Log4jController kafka:type=kafka.Log4jController
复制代码

 

转载于:https://www.cnblogs.com/songanwei/p/9203529.html


http://www.niftyadmin.cn/n/4557680.html

相关文章

面试2——java基础2

11、MVC设计模型 mvc设计模型是一种使用model-view-controller&#xff08;模型-视图-控制器&#xff09;设计创建web应用程序的模式。是一种开发模式&#xff0c;好处是可以将界面和业务逻辑分离。 model&#xff1a;是程序的主体&#xff0c;主要包含业务数据和业务逻辑。是应…

java中的负整数和正整数在一起取模(%)

参考算法第一章答疑&#xff1a; a % b 的余数定义为&#xff1a;&#xff08;a / b&#xff09;* b a % b 总是等于 a 例1&#xff1a;-14 % 3 -2 &#xff0c;14 % -3 2 -14 / 3 以及 14 / -3 他们的商都为 -4 按照常规&#xff1a;3 * -4 > -14 余 -2 &#xff0…

C语言2维数组动态问题

里面有很多相关的操作 y)若有兴趣可以写一个俄罗斯方块 那么二维就是一个面了(分为两个方向:x 它也被称之为矩阵可以这样看:一维是条线 ") } System.out.println(" ");}一时还想不到其他的例子 所以就把你那个二维数组打印出来 ||| 二维数组应该很清楚的啊 }; |…

websockect入门(一)

可能很多人会问明明有http协议为什么还需要websockect呢?那是因为http协议也是有缺陷啊&#xff1a;http只能单向的通信&#xff0c;即http的通信只能是客户端发起。当然如果要实现双向的通讯&#xff0c;也是可以的&#xff0c;那么 只能通过轮询来实现。 那么何为轮询呢&…

这个C++的程序的错误提示是什么意思

||| password这个函数定义非法.第二个错误是说你程序意外的结束..应该是少了右大括号. ||| 没有结尾啊 后面那个错误应该是&#xff1a;password&#xff08;&#xff09;这个函数前面的函数 你试试打个“}”试试 没有正常结尾 结合后边那个fatal error来看 可能少了“}” 最好…

js图片转base64编码

let reader new FileReader();reader.readAsDataURL(file.raw);reader.onload () > {let imageUrl reader.result;  //此处便是得到的64位编码格式};转载于:https://www.cnblogs.com/ruthless/p/9209115.html

信号 信号量

做备份 信号&#xff1a;暂时没有找到相关的较好的书籍描述&#xff0c;但是我的大体理解是信号需要注册&#xff0c;需要接收程序&#xff08;用于触发&#xff09;相反也可以被忽略&#xff0c;需要发送端&#xff08;进程&#xff08;线程&#xff09;通信&#xff09;。 …

关于学习c语言

因为初级编程只用到了一些简单的英语单词 曾怡教授讲解 就怕这个兴趣是一时的 一切都不成问题 只要感兴趣 不是英语 C是计算机语言 a)其实很简单 关键是看自己有没得信心学好http://download.anqn.com/anqn.com-051008-c28.rarhttp://download.anqn.com/anqn.com-051008-cc27.r…