0%

推荐系统-协同过滤

协同过滤在整个推荐系统的发展历史上具有重要地位 本文介绍了在分布式环境下如何利用协同过滤的思想进行推荐, 协同思想在推荐系统中会反复使用, 工程上实现简单, 业务可解释性强。协同过滤大体上分为基于用户(user-based)的协同过滤算法和基于商品(item-based)的协同过滤算法。 两种算法在具体实现上没有过多区别。

user-based算法主要步骤:

  1. 找到和目标用户相似的用户集合
  2. 找到这个集合中的用户喜欢的, 并且目标用户没有听说过的物品推荐给目标用户

item-based算法主要步骤:

  1. 计算物品之间的相似度
  2. 根据物品的相似度和目标用户的历史行为生成推荐列表

稀疏矩阵表示用户商品交互

如果用户对于商品的交互信息用一个矩阵来表示, 毫无疑问这是一个稀疏矩阵, 在实际业务场景中会存在大量的缺失值, 如何猜出这些缺失值并且有效推荐给用户? 我们采用隐式反馈模型来考量, 某个用户对某一商品有交互有偏好, 那么值为1否则为0。在预测用户行为(隐式反馈)时,大部分只有用户正向选择行为的记录(正样本),这类问题即为One Class 问题,即单类问题。例如我们用0-1矩阵来表示这类问题,1的解释即为用户喜欢该物品,但是0却有两种原因来解释:一是用户没有看到该物品,但是用户喜欢;二是用户已经浏览过了但是不感兴趣。当我们使用协同过滤算法时, 解决0的办法就是通过相似性.

隐式反馈 显示反馈
准确度
丰富度
获取难度 容易 困难
数据噪音 较难识别 较易识别
上下文敏感
用户偏好的表达能力 只有正样本 包含正负样本
评估比较标准 相对比较 绝对比较

计算相似度

我们以item-based为例 两个物品的相似度可以表v示为 \[w_{i,j} = \frac{\left| N(i) \bigcap N(j) \right| }{\sqrt{\left|N(i)\right|\left|N(j)\right|}} \tag{1}\]

\(N(i)\)是喜欢物品i的用户数在协同过滤中两个物品产生相似度是因为它们共同被喜欢,每一个用户都通过历史兴趣列表给物品”贡献“ 相似度。这其中蕴含着一种假设,每一个用户的兴趣都局限在某几个方面,因此如果两个物品属于一个用户的兴趣列表,那么这两个物品可能就属于有限的几个领域,而如果两个物品属于很多用户的兴趣列表,那么他们可能属于同一个领域, 因而有很大的相似度

上面的相似度公式在实际使用中还有改进的地方, 上文中说到每个用户的兴趣列表都对物品的相似度产生贡献, 但是不是每一个用户的贡献都相同。对于特别活跃的用户(比如某用户的兴趣列表长达几十万), 这类用户对于物品之间的贡献度应该小, 即活跃用户对物品的相似度贡献应该小于不活跃的用户, 使用一个IUF(Inverse User Frequence)参数来修正公式

\[ w_{ij}= \frac{\sum_{u\in N(i) \bigcap N(j)}\frac{1}{\log{(1 + \left| N(u) \right|)}} }{\sqrt{\left| N(i) \right| \left| N(j) \right|}} \tag{2}\]

我们看到分子相比较上面的公式, 不再是简单的相加而是带有了一个iuf的系数, 我们还可以进一步将时间上下文考虑进去。我们有一个假设用户在相隔很短的时间内喜欢的物品具有更高的相似度, 如果用户对物品i和物品j产生行为的时间越远物品和物品j的相似度就不大, 据此我们得到如下的相似度公式 \[ w_{ij}= \frac{\sum_{u\in N(i) \bigcap N(j)}\frac{1}{\log{(1 + \left| N(u) \right|)}} \frac{1}{1 + \alpha \left| t_{ui} - t_{uj} \right|} }{\sqrt{\left| N(i) \right| \left| N(j) \right|}} \tag{3} \]

其中\(\alpha\)是时间衰减参数, 如果一个系统用户兴趣变化很快, 取较大的\(\alpha\),反之取较小的\(\alpha\)

利用spark计算item相似度

可以利用spark计算两个稀疏矩阵的乘法具体的原理见文章,以下是核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
*
* @param spark: sparkSession
* @param user_item: userID,itemID,(Interaction, interactionTime)
*/
def itemCF(spark: SparkSession, user_item: RDD[(Int, Int, (Double, Double))]) = {


val item_user = user_item.map(x => (x._2, x._1, x._3))

val useriuf = user_item.map(x => (x._1, x._2)).aggregateByKey(0)((z, b) => z + 1, _ + _) // userIDx, list(itemIdx)
.map(x => (x._1, 1 / Math.log(1 + x._2))) // user对应的IUF系数
.collectAsMap()

println("useriuf size ", useriuf.size)
val bruseriuf = spark.sparkContext.broadcast(useriuf)

val lm = item_user.map(x => (x._2, (x._1, x._2, x._3))).partitionBy(new HashPartitioner(500)) //"l_row", "l_column", "l_val"
val rm = user_item.map(x => (x._1, (x._1, x._2, x._3))).partitionBy(new HashPartitioner(500)) //"r_row", "r_column", "r_val"

val numerator = lm.join(rm)
.map(x => ((x._2._1._1, x._2._2._2), x._1, 1/(1 + TIME_DECAY_ALPHA * math.abs(x._2._1._3._2 - x._2._2._3._2)/HOURSECONDS))) // 这里拿到用户 以及用户访问两个item的时间差
.map(x => (x._1, bruseriuf.value.getOrElse(x._2, 0.0) * x._3))
.partitionBy(new HashPartitioner(500))
.aggregateByKey(0.0)(_ + _, _ + _)
.filter(x => x._1._1 >= x._1._2) //Gram矩阵 主对角线对称 取下对角

val itemMatrix = sparseMatrixMultiple(spark, item_user.map(x=>(x._1,x._2, x._3._1)), user_item.map(x=>(x._1, x._2, x._3._1))).map(x => (x._1._1, x._1._2, x._2)).cache()

val diagonal = itemMatrix.filter(x => x._1 == x._2).map(x => (x._1, x._3)).collectAsMap()

val brdiagonal = spark.sparkContext.broadcast(diagonal) //每个商品对应的交互用户

val rs = itemMatrix.filter(x => x._1 >= x._2).map(x => {
val i = brdiagonal.value(x._1)
val j = brdiagonal.value(x._2)
((x._1, x._2), Math.sqrt(i * j))
})
.partitionBy(new HashPartitioner(500))
.join(numerator)

itemMatrix.unpersist()

val res = rs.map(x => (x._1._1, x._1._2, x._2._2 / x._2._1))
.flatMap(x => if (x._1 != x._2) Array((x._1, x._2, x._3), (x._2, x._1, x._3)) else Array((x._1, x._2, x._3)))//需要把矩阵展开
res
}

/**
* rdd 稀疏矩阵相乘
* @param spark sparkSession
* @param l 左矩阵
* @param r 右矩阵
*/
def sparseMatrixMultiple(spark: SparkSession, l: RDD[(Int, Int, Double)], r: RDD[(Int, Int, Double)]) = {

val lm = l.map(x => (x._2, (x._1, x._2, x._3))).partitionBy(new HashPartitioner(500)) //"l_row", "l_column", "l_val"
val rm = r.map(x => (x._1, (x._1, x._2, x._3))).partitionBy(new HashPartitioner(500)) //"r_row", "r_column", "r_val"

val f = lm.join(rm) // l_column = r_row
.map(x => ((x._2._1._1, x._2._2._2), x._2._1._3 * x._2._2._3))
.partitionBy(new HashPartitioner(500))
.aggregateByKey(0.0)(_ + _, _ + _)
f
}

  • 使用隐式反馈模型, 用户商品交互数据用一个稀疏矩阵表示

  • 如何计算每个item有多少user交互以及item两两之间有多少相同user交互? 对于这部分计算是可以等价成两个矩阵相乘, 假设矩阵X是用坐标表示的稀疏矩阵(i, j, v) i是item对应id, j是user对应id, 那么\(XX^T\)中, 主对角线的每个元素就是对应item所有交互的user数, 其它元素是两两item之间共同的交互用户数

  • \(XX^T\) 是一个Gram对称矩阵, 可以适当减少计算量加快计算速度

使用item相似度

我们需要取item相似的前topK数据

1
2
3
4
5
6
7
8
val itemSimRDD = itemSim.filter(x => x._1 != x._2)
.map(x => (x._1, (x._2, x._3)))
.topByKey(500)(Ordering.by[(Int, Double), Double](_._2))
//进行归一化操作
.map(x => {
val max = x._2.head
(x._1, x._2.map(t => (t._1, t._2 / max._2)))
})

  • 计算item相似度的topK可以使用spark提供的topByKey功能, 然后做归一化操作, 归一化以后可以增加推荐的准确度和推荐的覆盖度

根据item相似度推荐

一般来说用户现在的行为和用户最近的行为关系更大

\[p(u, i) = \sum_{j \in N(u) \bigcap S(i, k)} sim(i, j)\frac{1}{1 + \beta\left| t_0 - t_{uj} \right|} \tag{4} \] 这里\(N(u)\)是用户交互的物品集合, \(S(i, k)\) 是和物品i最相似的topK物品, \(sim(i, j)\)是物品i和j的相似度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
*
* @param spark: sparkSession
* @param interactRDD: user, item, timeStamp
* @param simItem: item->Array(item, simscore)
*/
def prediction(spark:SparkSession, interactRDD:RDD[(String, String, Double)],
simItem: Map[String, Array[(String, Double)]]) ={


val brsimItem = spark.sparkContext.broadcast(simItem)
val currentSec = (System.currentTimeMillis()/1000).toDouble

val pred = interactRDD.map(x => (x._1, (x._2, x._3)))
.aggregateByKey(Array.empty[(String, Double)])(_:+_, _++_)
.map(x=> (x._1, x._2.toMap))
.map{x =>
val predItem = brsimItem.value.mapValues{ y =>
y.map{t =>
if(x._2.isDefinedAt(t._1))
1/(1 + TIME_DECAY_BETA * math.abs(currentSec - x._2.get(t._1).get) / HOURSECONDS) * t._2
else
0.0
}.sum
}.toList.sortBy(_._2)(Ordering.Double.reverse).take(500)

(x._1, predItem)
}
pred
}

  • 物品相似度矩阵存储每个item最相思的topK商品, 使用这个相似度矩阵的核心思想是: 和用户历史上感兴趣的物品越相似的物品,, 越有可能在用户的推荐列表中获得比较高的排名
  • 在spark中计算如果广播量太大可以切分成较小的部分, 分批计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
*
* @param spark sparkSession
* @param interactRDD user, item, timeStamp
* @param simItem item->Array(item, simscore)
* @return
*/
def prediction2(spark:SparkSession, interactRDD:RDD[(String, String, Double)],
simItem: RDD[(String, Array[(String, Double)])]) ={

val currentSec = (System.currentTimeMillis()/1000).toDouble

val simItemAsIterator = simItem.repartition(10).mapPartitions{it =>
Iterator.single(it.toMap)
}

interactRDD.cache()

val prediction = (for( itemSim <- simItemAsIterator.toLocalIterator) yield {
val brsimItem = spark.sparkContext.broadcast(itemSim)
val pred = interactRDD.map(x => (x._1, (x._2, x._3)))
.aggregateByKey(Array.empty[(String, Double)])(_:+_, _++_)
.map(x=> (x._1, x._2.toMap))
.map{x =>
val predItem = brsimItem.value.mapValues{ y =>
y.map{t =>
if(x._2.isDefinedAt(t._1))
1/(1 + TIME_DECAY_BETA * math.abs(currentSec - x._2.get(t._1).get) / HOURSECONDS) * t._2
else
0.0
}.sum
}.toList.sortBy(_._2)(Ordering.Double.reverse).take(500)

(x._1, predItem)
}
pred
}).reduce(_ ++ _).reduceByKey(_ ++ _)
.map(x=>(x._1, x._2.sortBy(_._2)(Ordering.Double.reverse).take(500)))
interactRDD.unpersist()
prediction
}

userCF和itemCF对比

摘自推荐系统与实战

userCF itemCF
性能 适用用户较少场景, 用户较大场景下计算用户相似度矩阵代价较大 适用用户数明显小于用户数的场景
领域 适用时效性强, 用户个性化兴趣不太明显的领域 长尾物品丰富,用户个性化需求强烈的领域
实时性 在用户有新行为是, userCF不会造成推荐结果的立即变化 用户有新行为 itemCF一定会导致推荐结果的实时变化