离线或者实时, 数据量大或者小,不同的场景我们的解决方案肯定不一样的。

1 实时

https://blog.csdn.net/xindoo/article/details/79946492

https://zhuanlan.zhihu.com/p/35143913

2 离线

第一步,注册表统计每个产品以及url的次数

第二步,按照产品分区,对每个分区内数据进行排序

spark如何解决常见的TOP-N问题 这篇文章的思路就是:

/**
  * 2018年6月21日13:05:00
  * 使用 pair RDD的 repartitionAndSortWithinPartitions
  * @param args 无参数
  */
def main(args: Array[String]) {
  val conf=new SparkConf().setAppName("Test").setMaster("local[2]")
  val sc=new SparkContext(conf)
  val array=Array(2,4,6,67,3,45,26,35,789,345)
  val data=sc.parallelize(array)
  // 替换repartition组合sortBy
  data.zipWithIndex().repartitionAndSortWithinPartitions(new HashPartitioner(1)).foreach(println)
 /* 输出:
  (2,0)
  (3,4)
  (4,1)
  (6,2)
  (26,6)
  (35,7)
  (45,5)
  (67,3)
  (345,9)
  (789,8)
  */
}

整体感觉上述方法绕

3 离线传统解决方案

package Basic
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by tg on 10/25/16.
  */
object Top3 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("Top3").setMaster("local")
    val sc=new SparkContext(conf)
    //从HDFS中读取数据
    val lines=sc.textFile("hdfs://tgmaster:9000/in/nums2")
    /**
      * 1、通过map算子形成映射
      * 2、通过sortByKey算子针对Key进行降序排列
      * 3、通过map算子获取排序之后的数字
      * 4、通过take算子获取排序之后的前3个数字
      * 5、通过foreach算子遍历输出
      */
    lines.map(m=>(m.toInt,m.toInt)).sortByKey(false)
      .map(m=>m._1).take(3).foreach(println _)
  }
}

4 离线分组排序解决方案

  val lines = Array("class1 10", "class2 20", "class3 12", "class4 15", "class1 40")
  val data=sc.parallelize(array)

5 教程

MapReduce/Hadoop 下的top-10解决方案;假定所有输入的key都是唯一的。

Spark/Hadoop 下的top-10解决方案;假定所有输入的key都是唯一的

Spark/Hadoop 下的top-10解决方案;假定所有输入的key不是唯一的

    主要提供3种解决方案:
  • 确保所有K是唯一的。要保证K是唯一的,我们要把输入映射到rdd的k-v对,然后交给reduceByKey
  • 将所有唯一的(K, V)对划分为M个分区
  • 找出各个分区的top-N(我们称为本地topN)
  • 找出所有本地topN的最终topN
package org.dataalgorithms.chap03.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.SortedMap

/**
 * Find TopN (N > 0) using mapPartitions().
 * Each partition finds TopN, then we find TopN of all partitions.
 *
 *
 * @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com)
 *
 * @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com)
 *
 */
object TopN {

  def main(args: Array[String]): Unit = {
    if (args.size < 1) {
      println("Usage: TopN <input>")
      sys.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("TopN")
    val sc = new SparkContext(sparkConf)

    val N = sc.broadcast(10)
    val path = args(0)

    val input = sc.textFile(path)
    val pair = input.map(line => {
      val tokens = line.split(",")
      (tokens(2).toInt, tokens)
    })

    import Ordering.Implicits._
    val partitions = pair.mapPartitions(itr => {
      var sortedMap = SortedMap.empty[Int, Array[String]]
      itr.foreach { tuple =>
        {
          sortedMap += tuple
          if (sortedMap.size > N.value) {
            sortedMap = sortedMap.takeRight(N.value)
          }
        }
      }
      sortedMap.takeRight(N.value).toIterator
    })

    val alltop10 = partitions.collect()
    val finaltop10 = SortedMap.empty[Int, Array[String]].++:(alltop10)
    val resultUsingMapPartition = finaltop10.takeRight(N.value)

    //Prints result (top 10) on the console
    resultUsingMapPartition.foreach {
      case (k, v) => println(s"$k \t ${v.asInstanceOf[Array[String]].mkString(",")}")
    }

    // Below is additional approach which is more concise
    val moreConciseApproach = pair.groupByKey().sortByKey(false).take(N.value)

    //Prints result (top 10) on the console
    moreConciseApproach.foreach {
      case (k, v) => println(s"$k \t ${v.flatten.mkString(",")}")
    }

    // done
    sc.stop()
  }
}