Skip to content
matt90luo's Blog
Go back

推荐系统-使用ALS求解LFM模型

在之前的文章中我曾写到, 用户和物品的交互关系可以是一个稀疏矩阵, 我们把这个矩阵称作评分矩阵R{m,n}R_\{m,n\}, 比如下面的例子, ? 表示缺失

user/itemitem_0item_1item_2item_3item_4
user_0?212?
user_12??12
user_2?112?
user_31??13
<!— more —>

所以我们有评分矩阵:

R_\{m,n\} = \\begin\{bmatrix\} ? & 1 & 1 & 2\\\\ 2 & ? & ? & 1 \\\\ ? & 1 & 1 & 2 \\end\{bmatrix\}

分解评分矩阵

如果对评分矩阵RR做低秩矩阵分解(Low Rank Matrix Factorization), 既有R{m,n}=P{m,f}Q{n,f}TR_\{m, n\} = P_\{m, f\} * Q_\{n, f\}^T, 这就引申到LFM (Latent Factor Model) 隐因子模型,其中隐因子可以理解为一个用户喜欢一个商品的隐形原因,比如电影里面有他喜欢的romantic和action元素,还有他喜欢的某个演员或者导演编剧。如果另外一个电影有类似的元素跟演员,那么他很有可能会也喜欢这部电影。LFM的核心思路就是求出用户矩阵PP和商品矩阵QQ

在协同过滤中我们使用item相似度做推荐的核心思想可以用下列公式表达 \begin{equation} \hat{r}{u,i} = \frac{\sum{j \in S^k(i,u)} s_{i,j} r_{u, i}}{ \sum_{j \in S^k(i, u)} s_{i, j}} \end{equation}

其中sk(i,u)s^k(i, u) 代表和物品i最相似的, 并且被用户u交互过k个商品,也就是N(u)bigcapS(i,k)N(u) \\bigcap S(i, k),如果从LFM的角度考虑, 从评分矩阵求得对应的用户矩阵和商品矩阵, 使得sumlimits{r{u,i}isknwon}(r{ui}puTqi)2\\sum\\limits_\{r_\{u, i\}\\ is\\ knwon\}(r_\{ui\} - p_u^Tq_i)^2 最小, 那么我们就可以用PQP*Q 来代替评分矩阵从而填充了那些缺失的评分值。

ALS交替最小二乘法

\begin{equation} \min \limits_{p*, q*} \sum\limits_{r_{u, i}\ is\ knwon}(r_{ui} -p_u^Tq_i)^2 + \lambda(||p_u||^2 + ||q_i||^2) \end{equation}

下面就是对上述公式求解使用ALS算法求解P,QP,Q矩阵。查阅了一些网上的教程,可以直接化简为一般性的线性回归问题,对于mathbf{X}thetamathbf{Y}\\mathbf\{X\}\\theta-\\mathbf\{Y\}, theta\\theta的解析解为 theta=(mathbf{X}Tmathbf{X}+lambdamathbf{I}){1}mathbf{X}TY\\theta=(\\mathbf\{X\}^T\\mathbf\{X\}+\\lambda\\mathbf\{I\})^\{-1\}\\mathbf\{X\}^TY, 同理对于mathbf{R}mathbf{X}Tmathbf{Y}\\mathbf\{R\}-\\mathbf\{X\}^T\\mathbf\{Y\}, 当固定mathbf{X}\\mathbf\{X\}时, mathbf{Y}=(mathbf{X}mathbf{X}T+lambdamathbf{I})mathbf{X}mathbf{R}\\mathbf\{Y\}=(\\mathbf\{X\}\\mathbf\{X\}^T+\\lambda\\mathbf\{I\})*\\mathbf\{X\}*\\mathbf\{R\}, 当固定mathbf{Y}\\mathbf\{Y\}时,mathbf{X}=(mathbf{Y}mathbf{Y}T+lambdamathbf{I})mathbf{Y}mathbf{R}T\\mathbf\{X\}=(\\mathbf\{Y\}\\mathbf\{Y\}^T+\\lambda\\mathbf\{I\})*\\mathbf\{Y\}*\\mathbf\{R\}^T

但是,我对以上做法持保留意见,在迭代过程中并没有发现RMSE随迭代减小。随后我阅读了原始的文献, 我觉得对于缺失的评分值不应该加入运算,,只会考虑有评分值得情况所对应的线性回归问题。

假设现在有一个稀疏矩阵RR,以及对应的MM, NN,稀疏矩阵每次按列计算,并且只计算对应值得矩阵, 如公式ref{eq3}\\ref\{eq3\}所示,稀疏矩阵RR00列只在00行和22行有值, 那么我们取矩阵MM0022行,计算线性回归M{0,2}theta0R{0}M_\{0,2\}\\theta_0 - R^\{0\},可以利用正规方程法或者梯度下降法求解theta0\\theta_0,也就是NN矩阵的第00行, 直到求出所有的矩阵NN的向量。然后再将NN固定,此时注意维度的统一需要将RR转置, 求得MM, 如此循环往复。

\begin{equation}\label{eq3}\begin{split} R &= \begin{array}{cc} & \begin{matrix} 0&1&2&3&4\end{matrix}\ \begin{matrix} 0\ 1 \ 2 \ 3 \end{matrix} & \begin{bmatrix}X& & X & &X \ & X& X& X&\ X& X& & X& \ & & X& X& \end{bmatrix} \end{array}\\ M &= \begin{matrix} & k \ m & \begin{bmatrix} ---\---\---\---\\end{bmatrix} \end{matrix} \\

N &= \begin{matrix} & k \ n & \begin{bmatrix} ---\---\---\---\---\\end{bmatrix} \end{matrix} \end{split}\end{equation}

使用python实现ALS

import numpy as np
import matplotlib.pyplot as plt

def rmse(M, N, R):
    a = np.matmul(M, N.T)
    idx = list(map(lambda x: (x[0], x[1]), R))
    aa = list(map(lambda x:a[x[0]][x[1]], idx))
    r = list(map(lambda x: x[2], R))
    return np.sqrt(((np.array(aa) - np.array(r))**2).sum()/len(R))

def get_sparse_matrix():
    r = [(0, 0, 1), (0, 2, 3), (0, 4, 5),
         (1, 1, 1), (1, 2, 1), (1, 3, 1),
         (2, 0, 1), (2, 1, 6), (2, 3, 6),
         (3, 2, 1), (3, 3, 1)]
    return 4, 5, r

def sparse_matrix_transpos(l, r, mat):
    m = list(map(lambda x: (x[1], x[0], x[2]), mat))
    return r, l, m

def initial(m, n, k):
    return np.random.rand(m, k), np.random.rand(n, k)

def als(M, N, l, r, R, lam=0.01):
    res =[]
    for i in range(r):
        mat = list(filter(lambda x: x[1] == i, R))
        y = list(map(lambda x: x[2], mat))
        idx = list(map(lambda x: x[0], mat))
        x = np.array(list(map(lambda x: M[x], idx)))
        theta = np.matmul(np.matmul(np.linalg.inv(np.matmul(x.T, x) + lam*np.eye(M.shape[1])), x.T), y)
        res.append(theta)

    NN = np.array(res)
    RR = list(map(lambda x: (x[1], x[0], x[2]), R))
    Res = []
    for j in range(l):
        mat = list(filter(lambda x: x[1] == j, RR))
        y = list(map(lambda x: x[2], mat))
        idx = list(map(lambda x: x[0], mat))
        x = np.array(list(map(lambda x: NN[x], idx)))
        theta = np.matmul(np.matmul(np.linalg.inv(np.matmul(x.T, x) + lam * np.eye(NN.shape[1])), x.T), y)
        Res.append(theta)
    MM = np.array(Res)
    return MM, NN

def iterALS(R, iter=10):
    it = 0
    M, N = initial(4, 5, 2)
    rmse_res = []
    while(it &lt; iter):
        M, N = als(M, N, 4, 5, R) # als(M, N, l, r, R, lam=0.1)
        rmse_val = rmse(M, N, R)
        rmse_res.append(rmse_val)
        print("iter = %d"%it, "RMSE = %f"%rmse_val)
        it += 1
    plot_acc_loss(rmse_res)

def plot_acc_loss(loss):
    plt.figure("rmse")
    plt.plot(range(len(loss)), loss, 'r')
    plt.show()

if __name__ == '__main__':
    m, n, mat = get_sparse_matrix()
    iterALS(mat, iter=10)

RMSE如下图:

利用spark计算ALS

鉴于评分矩阵的稀疏性, 我们同样可以借助spark来计算,具体代码如下

import org.apache.log4j.&#123;Level, Logger&#125;
import org.apache.spark.&#123;SparkConf, SparkContext&#125;
import org.apache.spark.sql.SparkSession
import breeze.linalg._
import breeze.numerics._
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer
object Als &#123;

  private var spark: SparkSession = _
  private var conf: SparkConf = _
  private var sc: SparkContext = _

  /**
    * 随机初始化 M, N
    * @param m
    * @param n
    * @param k
    * @return
    */
  def initial(m:Int, n:Int, k:Int) =&#123;
      val M = sc.parallelize((0 until m ).map(x =&gt; (x, DenseVector.rand(k))))
      val N = sc.parallelize((0 until n ).map(x =&gt; (x, DenseVector.rand(k))))
    (M, N)
  &#125;

  def getRatingMatrix() =&#123;
    val mR = sc.parallelize(List((0, 0, 1), (0, 2, 3), (0, 4, 5),
    (1, 1, 1), (1, 2, 1), (1, 3, 1),
    (2, 0, 1), (2, 1, 6), (2, 3, 6),
    (3, 2, 1), (3, 3, 1)).map(x =&gt; (x._1, x._2, x._3.toDouble)))
    val nR = mR.map(t =&gt; (t._2, t._1, t._3))
    (mR.cache(), nR.cache())
  &#125;

  /**
    * 计算稀疏矩阵的rmse
    * @param M
    * @param N
    * @param mR
    * @return
    */
  def rmse(M:RDD[(Int, DenseVector[Double])], N:RDD[(Int, DenseVector[Double])], mR:RDD[(Int, Int, Double)])=&#123;
     val errRDD = mR.map(x =&gt; (x._1, (x._2, x._3))).aggregateByKey(List.empty[(Int, Double)])(_:+_, _++_)
      .join(M).map(x =&gt; (x._2._1, x._2._2)).flatMap(t =&gt; t._1.map(x =&gt; (x._1, (t._2, x._2))))
      .join(N).map(x =&gt; (x._2._1._1, x._2._2, x._2._1._2)) // mVect, nVec, r

    sqrt(errRDD.map(x =&gt; x._1.t * x._2 - x._3).map( x =&gt; pow(x, 2)).sum() / errRDD.count())
  &#125;

  /**
    * 规定维度约束 M * N.t = mR 或者 N * M.t= nR
    * @param M
    * @param N
    * @param mR
    * @param nR
    */
  def als(M:RDD[(Int, DenseVector[Double])], N:RDD[(Int, DenseVector[Double])],
          mR:RDD[(Int, Int, Double)], nR:RDD[(Int, Int, Double)]) =&#123;

    val NN = mR.map(x =&gt; (x._1, (x._2, x._3))).join(M).map(x =&gt; (x._2._1, x._2._2))
      .map(x =&gt; (x._1._1, (x._2, x._1._2)))
      .aggregateByKey(List.empty[(DenseVector[Double], Double)])(_:+_, _++_)
      .map&#123;x =&gt;

        val matrixX = new DenseMatrix[Double](x._2.length, x._2.head._1.length)

        val matX = x._2.foldLeft((matrixX, 0))(  (z, t) =&gt; &#123; z._1(z._2,:: ):=t._1.t
          (z._1, z._2+1)&#125;)._1
         val vecY = new DenseVector(x._2.map(_._2).toArray)
        (x._1, inv(matX.t * matX + 0.01) * matX.t * vecY)
      &#125;

    val MM = nR.map(x =&gt; (x._1, (x._2, x._3))).join(NN).map(x =&gt; (x._2._1, x._2._2))
      .map(x =&gt; (x._1._1, (x._2, x._1._2)))
      .aggregateByKey(List.empty[(DenseVector[Double], Double)])(_:+_, _++_)
      .map&#123;x =&gt;

        val matrixX = new DenseMatrix[Double](x._2.length, x._2.head._1.length)

        val matX = x._2.foldLeft((matrixX, 0))(  (z, t) =&gt; &#123; z._1(z._2,:: ):=t._1.t
          (z._1, z._2+1)&#125;)._1
        val vecY = new DenseVector(x._2.map(_._2).toArray)
        (x._1, inv(matX.t * matX + 0.1) * matX.t * vecY)
      &#125;
    (MM, NN)
  &#125;


  def iterALS(M:RDD[(Int,DenseVector[Double])], N:RDD[(Int, DenseVector[Double])],
              mR:RDD[(Int, Int, Double)], nR:RDD[(Int, Int, Double)], iter:Int = 10)=&#123;

    val rmse_list =ArrayBuffer.empty[(Int, Double)]
    val res = (0 until iter).foldLeft( (M, N, mR, nR))( (z, b) =&gt;&#123;
      val res = als(z._1, z._2, z._3, z._4)
      val rmseVal = rmse(res._1, res._2, mR)
      rmse_list.append((b, rmseVal))
      (res._1, res._2, mR, nR)
    &#125;)
    rmse_list.foreach(println)
  &#125;


  def main(args: Array[String]): Unit = &#123;
    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)

    conf = new SparkConf().setMaster("local[4]").setAppName("demo")
    spark = SparkSession.builder().config(conf)
      .config("spark.some.config.option", "some-value")
      .config("spark.sql.broadcastTimeout", "20000")
      .config("spark.driver.maxResultSize", "25g")
      .config("spark.sql.shuffle.partitions", "600")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.shuffle.file.buffer.kb", "10240")
      .config("spark.storage.memoryFraction", "0.2")
      .config("spark.shuffle.memoryFraction", "0.6")
      .config("spark.sql.crossJoin.enabled", "true").getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    sc = spark.sparkContext

    val initMatrix = initial(4, 5, 2)
    val M = initMatrix._1
    val N = initMatrix._2

    val R= getRatingMatrix()
    val mR = R._1
    val nR = R._2

    iterALS(M, N, mR, nR)

  &#125;
&#125;

需要计算的向量会join在一起, 如果两个分解的矩阵不大的话,还可以作为广播变量进一步提高效率。其中的线性回归部分借助breeze包,方法依然是正规方程法。可以替换用梯度下降法。每次迭代的RMSE如下:

(iter:0,rmse = 0.376204)
(iter:1,rmse = 0.126822)
(iter:2,rmse = 0.061821)
(iter:3,rmse = 0.047608)
(iter:4,rmse = 0.042685)
(iter:5,rmse = 0.039862)
(iter:6,rmse = 0.037896)
(iter:7,rmse = 0.036419)
(iter:8,rmse = 0.035267)
(iter:9,rmse = 0.034347)

小结

  1. ALS算法本质上也就是线性回归,但是要注意因为稀疏矩阵的存在导致参与计算的只有部分向量。
  2. 单机可以处理的情况推荐使用正规方程法,对于更大规模的数据可以使用梯度下降法。

Share this post on:

Previous Post
机器学习--spark基于opencv做图片分类
Next Post
大数据--spark-streaming开发总结