一点做用户画像的人生经验:ID 强打通



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

本文转载自: https://www.cnblogs.com/en-heng
1. 背景

在构建精准用户画像时,面临着这样一个问题:日志采集不能成功地收集用户的所有 ID,且每条业务线有各自定义的 UID 用来标识用户,从而造成了用户 ID 的零碎化。因此,为了做用户标签的整合,用户 ID 之间的强打通(亦称为 ID-Mapping)成了迫切的需求。大概三年前,在知乎上有这样一个与之相类似的问题:如何用 MR 实现并查集以对海量数据 pair 做聚合;目前为止还无人解答。本文将提供一个可能的基于 MR 计算框架的解决方案,以实现大数据下的 ID 强打通。

首先,简要地介绍下 Android 设备常见的 ID:

  • IMEI(International Mobile Equipment Identity),即通常所说的手机序列号、手机“串号”,用于在移动电话网络中识别每一部独立的手机等行动通讯装置;序列号共有 15 位数字,前 6 位(TAC)是型号核准号码,代表手机类型。接着 2 位(FAC)是最后装配号,代表产地。后 6 位(SNR)是串号,代表生产顺序号。最后 1 位(SP)一般为 0,是检验码,备用。

  • MAC(Media Access Control) 一般代指 MAC 位址,为网卡的标识,用来定义网络设备的位置。

  • IMSI(International Mobile SubscriberIdentification Number),储存在 SIM 卡中,可用于区别移动用户的有效信息;其总长度不超过 15 位,同样使用 0~9 的数字。其中 MCC 是移动用户所属国家代号,占 3 位数字,中国的 MCC 规定为 460;MNC 是移动网号码,最多由两位数字组成,用于识别移动用户所归属的移动通信网;MSIN 是移动用户识别码,用以识别某一移动通信网中的移动用户。

  • Android ID 是系统随机生成的设备 ID 为一串 64 位的编码(十六进制的字符串),通过它可以知道设备的寿命(在设备恢复出厂设置或刷机后,该值可能会改变)。

  • IDFA (Identifier for Advertisers) 是苹果推出来的用于广告标识的设备 ID,同一设备上的不同 APP 所获取的 IDFA 是一致的;但是用户可以自主更改 IDFA,所以 IDFA 并不是和设备一一绑定的。

2. 设计

从图论的角度出发,ID 强打通更像是将小连通图合并成一个大连通图;比如,在日志中出现如下三条记录,分别表示三个 ID 集合(小连通图):

通过将三个小连通图合并,便可得到一个大连通图——完整的 ID 集合列表。淘宝明风介绍了如何用 Spark GraphX 通过 outerJoinVertices 等运算符来做大数据下的多图合并;针对 ID 强打通的场景,也可采用类似的思路:日志数据构建大的稀疏图,然后采用自 join 的方式做打通。但是,我并没有选用 GraphX,理由如下:

  • GraphX 只支持有向图,而不支持无向图,而 ID 之间的关联关系是一个无向连通图;

  • GraphX 的 join 操作不完全可控,“不完全可控”是指在做图合并时我们需要做过滤山寨设备、一对多的 ID 等操作,而在 GraphX 封装好的 join 算子上实现过滤操作则成本过高。

因而,基于 MR 计算模型(Spark 框架)我设计新的 ID 打通算法;算法流程如下:打通的 map 阶段将 ID 集合中每一个 Id 做 key 然后进行打散(),Reduce 阶段按 key 做的合并。通过观察发现:仅需要两步 MR 便可完成上述打通的操作。以上面的例子做说明,第一步 MR 完成后,打通 ID 集合为:,第二步 MR 完成后便得到完整的 ID 集合列表。但是,在两步 MR 过程中,所有的 key 都会对应一个聚合结果,而其中一些聚合结果只是中间结果。故而引入了用于保存聚合时的 key 值,加入了第三步 MR,通过比较来对中间聚合结果进行过滤。算法的伪代码如下:

MR step1:
    Map: 
        input: id_set
        process: flatMap id_set;
        output: id -> (id_set, 1)
    Rduce:
        process: reduceByKey
        output: id -> (id_set, empty key_set, int_value)

MR step2:
    Map:
        input: id -> (id_set, empty key_set, int_value)
        process: flatMap id_set, if have id_aggregation, then add key to key_set
        output: id -> (id_set, key_set, int_value)
    Reduce: 
        process: reduceByKey
        output: id -> (id_set, key_set, int_value)

MR step3:
    Map:
        input: id -> (id_set, empty key_set, int_value)
        process: flatMap id_set, if have id_aggregation, then add key to key_set
        output: id -> (id_set, key_set, int_value)
    Reduce: 
        process: reduceByKey
        output: id -> (id_set, key_set, int_value)

Filters:
    process: if have id_aggregation, then add key to key_set
    filter: if no id_aggregation or key_set == id_set
    distinct

3. 实现

针对上述 ID 强打通算法,Spark 实现代码如下:

case class DvcId(id: String, value: String)

val log: RDD[mutable.Set[DvcId]]
// MR1
val rdd1: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = log
  .flatMap { set =>
    set.map(t => (t, (set, 1)))
  }.reduceByKey { (t1, t2) =>
    t1._1 ++= t2._1
    val added = t1._2 + t2._2
    (t1._1, added)
  }.map { t =>
    (t._1, (t._2._1, mutable.Set.empty[DvcId], t._2._2))
  }
// MR2
val rdd2: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = rdd1
  .flatMap(flatIdSet).reduceByKey(tuple3Add)
// MR3
val rdd3: RDD[(DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))] = rdd2
  .flatMap(flatIdSet).reduceByKey(tuple3Add)
// filter
val rdd4 = rdd3.filter { t =>
  t._2._2 += t._1
  t._2._3 == 1 || (t._2._1 -- t._2._2).isEmpty
}.map(_._2._1).distinct()

// flat id_set
def flatIdSet(row: (DvcId, (mutable.Set[DvcId], mutable.Set[DvcId], Int))) = {
  row._2._3 match {
    case 1 =>
      Array((row._1, (row._2._1, row._2._2, row._2._3)))
    case _ =>
      row._2._2 += row._1 // add key to keySet
      row._2._1.map(d => (d, (row._2._1, row._2._2, row._2._3))).toArray
  }
}

def tuple3Add(t1: (mutable.Set[DvcId], mutable.Set[DvcId], Int),
              t2: (mutable.Set[DvcId], mutable.Set[DvcId], Int)) = {
  t1._1 ++= t2._1
  t1._2 ++= t2._2
  val added = t1._3 + t2._3
  (t1._1, t1._2, added)
}

其中,引入常量 1 是为了标记该条记录是否发生了 ID 聚合的情况。

ID 强打通算法实现起来比较简单,但是在实际的应用时,日志数据往往是带噪声的:

  • 有山寨设备;

  • ID 之间存在着一对多的情况,比如,各业务线的 UID 的靠谱程度不一,有的 UID 会对应到多个设备。

另外,ID 强打通后是 HDFS 的离线数据,为了提供线上服务、保证 ID 之间的一一对应关系,应选择何种分布式数据库、表应如何设计、如何做到数据更新时而不影响线上服务等等,则是另一个需要思考的问题。


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com