AIQ | 实时推荐系统设计与实现



转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com

AIQ 机器学习大数据 知乎专栏 点击关注

1. 实时推荐系统与相关工作

1.1 原因

实时计算能够及时捕获用户短时兴趣,同时能够快速反馈分发当前系统的用户兴趣内容。大量实践以及发表的文章都显示了推荐系统实时化,对推荐精准度的提升的有效性和必要性。

1.2 腾讯架构与实现

实时推荐相关工作非常多,腾讯和北大合作的两篇 SIGMOD 文章是比较实际和详细的实现,采用的计算框架能够支持大规模数据的实时推荐,以下将会分开简述以下两篇文章。

2015 年

Huang 发表了基于 Storm 和 KV 存储的大规模实时推荐系统 (TencentRec: Real-time Stream Recommendation in Practice)

  1. 实现了一系列经典推荐算法的实时版本

  2. 实现了数种实时算法提高推荐精度

  3. 广泛应用于业务有效提高

腾讯采用使用 storm 原因,支持实时数据流式计算,良好的可扩展性、可容错性,采用简单编程模型。
文章核心包括实时增量计算的 ItemCF,以及用户隐式反馈计算、实时剪枝算法、基于用户画像的数据稀疏性策略。应用在多个业务上都有不同程度的提升,最明显的是腾讯视频的全局表现提升高达 30%。

全文核心应该是下图六道公式,阐述腾讯如何具体实现的增量 itemcf。

文章中的 co-rating, 其实就是我们常说的 user bias. 公式 3 和 4 解决了用户隐式反馈问题,细节的计算可以参考 2016 的文章,实际是一个 log 函数融合了用户的浏览、点击、分享、购买等行为,转化成 rating.

corating.png

请注意公式 4,由于他们定义了 corating,实际是将相似度的增量计算从 L2 范数的计算转化成了 L1 范数计算.(当 Rup 取 x 的时候,y=1/x)。

可扩展的增量计算

itemcf.png

initemcf.png

2016 年

腾讯视频的推荐应用 (Real-time Video Recommendation Exploration)

  1. 实时处理、大规模数据下的准确率和可扩展性。

  2. 开发了一个基于矩阵分解的大规模在线协同过滤算法,以及一系列的自适应更新策略。

  3. 通过增加包括视频类别、时间因素影响、用户画像剪枝以及训练等方法,提高实时 TopN 推荐的精度。

在我们看来,全文核心在于实时计算的数据流转,如下图所示:

tecvideo.png

基于 storm 的实时计算 topology 图:

topo.png

2. 糖豆的设计与实现

2.1 架构

糖豆整体推荐框架,从离线,近线,在线三套计算流程组合而成。在线流程基于 Spark Streaming 框架实现,部署在近线集群。 在线推荐框架实时根据用户行为,生成实时推荐列表,从而满足用户瞬时兴趣,提高推荐系统的推荐新鲜度。简单架构图如下:

糖豆实时架构.png

2.2 基于 Spark Streaming 的实现

2.2.1. 计算流程

实时计算流程如下图所示:

实时计算流程图

分解步骤:

  1. Spark Streaming 读取 Kafka,原始日志 ETL

  2. 提取用户隐式反馈,生成候选集 tuple (uid,vid)

  3. 每天凌晨会将离线计算好的 ItemCF 模型结果集导入 Redis。itemcf 数据结构是一个 similarity vid list。

  4. 实时维护看过视频 set, 对看过视频的处理候选集 tuple 过滤该用户看过的视频

  5. 实时更新推荐过视频 set, 候选集 tuple 过滤当天已经被推荐过的视频

  6. 候选集写入 Redis 推荐 list

python 实现:

if __name__ == "__main__":    print sys.argv
	reload(sys)
	sys.setdefaultencoding('utf-8')
	sc = SparkContext(appName="real_time_etl")    #20秒
	ssc = StreamingContext(sc, 15)
	brokers = "kafka-servers:9092"
	topic = "logstash"
	#读取kafka
	kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})   #解析日志、过滤无关数据、读取相似视频
	lines = kvs.map(lambda x : readJson(x[1])).filter(lambda x: x is not None).map(lambda x: getTopkfromRedis(x))    #lines.pprint()
	 #写入推荐结果
	lines.foreachRDD(lambda rdd: list2Redis(rdd))  
	ssc.start()
	ssc.awaitTermination()

2.2.2 监控

部署在集群 Master 节点的监控脚本会每 30s 扫描一次实时计算代码进程,如果发现进程被 failed,会自动拉起实时计算 Spark Steaming 进程。如果进程拉起失败会触发邮件、短信报警

#! /bin/shMOBILE="your phone numbers"RT_HOME=/home/realtime/recommend.py

DIR=/data/rtdamon
PID_FILE=$DIR/.run/rt-litetl-damon.pid
LOG_FILE=$DIR/.log/rt-litetl-damon.log
t=$(date -d "today" +"%Y-%m-%d %H:%M:%S")source /etc/profile 
echo $PID_FILE $LOG_FILEif [ -e "$PID_FILE" ];then
        pid=`cat $PID_FILE`        echo $pid
        damon_process_exists=`ps v -p $pid | grep "rt-litetl-damon.sh" | grep -v grep|grep -v \<defunct\> `        echo "damon process exists : $process_exists"
        if [ -n "$damon_process_exists" ]        then
                echo "Process rt-litetl-damon.sh is running! $t" >> $LOG_FILE
                exit
        fifipid=$$echo "$pid" > $PID_FILEwhile :do
        process_exists=`ps -ef|grep "$RT_HOME"|grep "spark"|grep -v grep|wc -l`        echo "process exists : $process_exists" >>$LOG_FILE
        if [ "$process_exists" == "0" ]; then/hadoop/spark/bin/spark-submit  --master yarn --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0 --py-files /hadoop/user/rt/redis.zip --num-executors 10 --executor-cores 7 --executor-memory 6g /home/realtime/recommend.py>>/data/rtlog/rtrecommed.log  2>&1 &
    /usr/bin/php -f /data/rtdamon/yunsms.class.php "$MOBILE" "recommend.py"
                echo "realtime recommendation process already restarted at $t" >> $LOG_FILE
        fi

        #sleep `expr 3600 \* 3`
        sleep `expr 60 \* 1`done
 


3. 问题与改进

  1. 较多代码逻辑集中在 Redis。目前 Redis 无灾备措施,同时 IO 和负载也会出现 Peak。

  2. Spark Streaming 目前实时级别在分钟级。需要升级成 storm 的秒、毫秒级别。

  3. 需要用户点击等行为才会生产数据,容易召回不足。


更多高质资源 尽在AIQ 机器学习大数据 知乎专栏 点击关注

转载请注明 AIQ - 最专业的机器学习大数据社区  http://www.6aiq.com