Fork me on GitHub

网站日志实时分析之 Flink 处理实时热门和 PVUV 统计

实时热门统计

操作步骤:

  • 先从Kafka读取消费数据
  • 使用map算子对数据进行预处理
  • 过滤数据,只留住pv数据
  • 使用timewindow,每隔10秒创建一个20秒的window
  • 然后将窗口自定义预聚合,并且兹定于窗口函数,按指定输入输出case操作数据
  • 上面操作时候返回的是DataStream,那么就根据timestampEnd进行keyby
  • 使用底层API操作,对每个时间窗口内的数据进行排序,取top
package com.ongbo.hotAnalysis

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/*
*定义输入数据的样例类
 */
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)
//定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {
  def main(args: Array[String]): Unit = {
    //1:创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //设置为事件事件
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //2:读取数据

    /*kafka源*/
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","114.116.219.197:5008,114.116.220.98:5008,114.116.199.154:5008")
    properties.setProperty("group.id","web-consumer-group")
    properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset","latest")
    val dataStream = env.addSource(new FlinkKafkaConsumer[String]("weblog", new SimpleStringSchema(),properties))
//    val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/HotItemAnalysis/src/main/resources/UserBehavior.csv")
      .map(data =>{
        System.out.println("data:"+data)
        val dataArray = data.split(",")
//        if(dataArray(0).equals("ij"))
        UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)

      })
      .assignAscendingTimestamps(_.timestamp * 1000L)

    //3:transform处理数据
    val processStream = dataStream
      //筛选出埋点pv数据
      .filter(_.behavior.equals("pv"))
      //先对itemID进行分组
      .keyBy(_.itemId)
      //然后设置timeWindow,size为1小时,步长为5分钟的滑动窗口
      .timeWindow(Time.seconds(20), Time.seconds(10))
      //窗口聚合,按道理说应该不用窗口聚合,但是因为达到的数据可能时间顺序会扰乱,所以聚合后要keyby
      .aggregate(new CountAgg(), new WindowResult())
      .keyBy(_.windowEnd)      //按照窗口分组

      .process(new TopNHotItems(10))


    //sink:输出数据
    processStream.print("processStream::")
//    dataStream.print()
    //执行
    env.execute("hot Items Job")



  }
}

/*自定义预聚合函数*/
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{
  //累加器初始值
  override def createAccumulator(): Long = 0
  //每来一次就加一
  override def add(in: UserBehavior, acc: Long): Long = acc+1
  //
  override def getResult(acc: Long): Long = acc

  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}

//自定义窗口函数,输出ItemViewCount
class WindowResult() extends WindowFunction[Long,ItemViewCount, Long, TimeWindow]{
  override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit =  {
    out.collect(ItemViewCount(key,window.getEnd,input.iterator.next()))
  }
}

//自定义处理函数
class TopNHotItems(topsize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {
  private var itemState: ListState[ItemViewCount] = _

  override def open(parameters: Configuration): Unit = {
    itemState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("item-state", classOf[ItemViewCount]))

  }
  override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {
    //把每条数据存入状态列表
    itemState.add(value)
    //注册一个定时器
    ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
  }
  //定时器触发时,对所有的数据排序,并输出结果
  override def onTimer(timestamp: Long, ctx: _root_.org.apache.flink.streaming.api.functions.KeyedProcessFunction[Long, _root_.com.ongbo.hotAnalysis.ItemViewCount, _root_.scala.Predef.String]#OnTimerContext, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
    //将所有state中的数据取出,放到一个list Buffer中
    val allItems: ListBuffer[ItemViewCount] = new ListBuffer()
    import scala.collection.JavaConversions._
    for(item <- itemState.get()){
      allItems += item
    }

    //按照点计量count大小排序,sortBy默认是升序,并且取前三个
    val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topsize)

    //清空状态
    itemState.clear()

    //格式化输出排名结果
    val result : StringBuilder = new StringBuilder
    result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
    //输出每一个商品信息
    for(i<- sortedItems.indices){
      val currentItem = sortedItems(i)
      result.append("No").append(i+1).append(":")
        .append("  商品ID:").append(currentItem.itemId)
        .append("  浏览量:").append(currentItem.count).append("\n")
    }
    result.append("============================\n")
    //控制输出频率
    Thread.sleep(1000)
    out.collect(result.toString())
  }
}
/*自定义预聚合函数计算平均数*/
class AverageAgg() extends AggregateFunction[UserBehavior, (Long,Int), Double]{
  override def createAccumulator(): (Long, Int) = (0L,0)

  override def add(in: UserBehavior, acc: (Long, Int)): (Long, Int) = (acc._1+in.timestamp, acc._2+1)

  override def getResult(acc: (Long, Int)): Double = acc._1 /acc._2

  override def merge(acc: (Long, Int), acc1: (Long, Int)): (Long, Int) = (acc._1+acc1._1, acc._2+acc1._2)
}

实时PV统计

这里按道理应该也要从Kafka读取数据的,但是这里暂时先从本地读,因为当时本地网络的原因,暂时不在服务器上创建数据,而直接用本地的。

这个很简单,直接创建滚动窗口,从而能够计算一个小时的PV,然后每隔一个小时更新一次。

package com.ongbo.NetWorkFlow_Analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/*
*定义输入数据的样例类
 */
case class UserBehavior(userId: Long, itemId: Long, cateGoryId: Int,behavior: String, timestamp: Long)

object PageVies {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    //用相对路径定义数据集
    val resource = getClass.getResource("/UserBehavior.csv")
    val dataStream = env.readTextFile(resource.getPath)
      .map(data =>{
        val dataArray = data.split(",")
        UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .filter(_.behavior.equals("pv"))
      .map(data => ("pv", 1))
      .keyBy(_._1)
      .timeWindow(Time.hours(1))
      .sum(1)
    dataStream.print("pv count")
    env.execute("PV")
  }
}

实时UV统计:布隆过滤器

我们统计UV需要注意,很多重复的user会占用到内存,所以我们采用布隆过滤器优化,减少Flink缓存user从而降低性能。而且将数据count保存在Redis,可以给后端使用的。
package com.ongbo.NetWorkFlow_Analysis

import com.ongbo.NetWorkFlow_Analysis.UniqueView.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UvWithBloom {
 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.setParallelism(1)

   //用相对路径定义数据集
   val resource = getClass.getResource("/UserBehavior.csv")
   val dataStream = env.readTextFile("/Users/ongbo/Maven/bin/UserBehaviorAnalysis/NetWorkFlowAnalysis/src/main/resources/UserBehavior.csv")
     .map(data =>{
       val dataArray = data.split(",")
       UserBehavior(dataArray(0).trim.toLong,dataArray(1).trim.toLong,dataArray(2).trim.toInt,dataArray(3).trim,dataArray(4).trim.toLong)
     })
     .assignAscendingTimestamps(_.timestamp * 1000L)
     .filter(_.behavior.equals("pv"))
     .map( data => ("dummyKey",data.userId))
     .keyBy(_._1)
     .timeWindow(Time.hours(1))
     .trigger(new MyTrigger())
     .process(new UvCountWithBloom())

   dataStream.print()
   env.execute()
 }
}


//自定义窗口触发器
class MyTrigger() extends Trigger[(String,Long),TimeWindow]{
  override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    //每来一条数据就直接触发窗口操作,并清空所有状态
    TriggerResult.FIRE_AND_PURGE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String,Long),UvCount,String, TimeWindow] {
  // 定义Redis连接
  lazy val jedis = new Jedis("114.116.219.97",5000)
  //29位,也就是64M
  lazy val bloom = new Bloom(1 << 29)
  override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
    //位图的存储方式 , key是windowwen,value是位图
    val storeKey = context.window.getEnd.toString
    var count = 0L
    //把每个窗口的count值,也存入Redis表里,存放内容位(windowEnd,uccount),所以要先从Redis中读取


    if(jedis.hget("count",storeKey) != null){
//      System.out.println(v)
      count = jedis.hget("count",storeKey).toLong
    }
    //用布隆过滤器判断当前用户是否已经存在
    val userId = elements.last._2.toString
    val offset = bloom.hash(userId, 61)
    //定义一个标志位,判断Redis位图中有没有这一位
    val isExist = jedis.getbit(storeKey, offset)
    if(!isExist){
      //如果不存在位图对应位置变成1,count+1
      jedis.setbit(storeKey,offset,true)
      jedis.hset("count",storeKey,(count+1).toString)
      out.collect(UvCount(storeKey.toLong,count+1))
    }else{
      out.collect(UvCount(storeKey.toLong,count))
    }
  }
}

class Bloom(size: Long) extends Serializable{
  //位图大小
  private val cap = if(size>0) size else 1 << 27

  //定义Hash函数
  def hash(value: String, seed: Int) : Long = {
    var result:Long = 0L
    for(i <- 0 until value.length){
      result = result * seed + value.charAt(i)
    }
    result & (cap-1)
  }
}

欢迎点赞+收藏+转发朋友圈素质三连

阅读原文


本文地址:https://www.6aiq.com/article/1597190128196
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出