10.sparkStreaming02

教程 野牛 ⋅ 于 2023-04-15 17:30:55 ⋅ 588 阅读

sparkStreaming02

21.4.2 updateStateByKey

java updateStateByKey方法 使用代码示例:

file

V2:上次数据

返回结果本次汇总数据,也就是下次的V2数据

scala代码:

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * calculate with state
 * keep all result
 */
object TestUpdateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("updateStateByKey")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("file:///headless/workspace/spark/data/ckpt")
    ssc.socketTextStream("nn1",6666)
      .flatMap(_.split(" "))
      .map((_,1))
//      .reduceByKey(_+_)
      .updateStateByKey((curr:Seq[Int],last:Option[Int])=>{
        Option(curr.sum + last.getOrElse(0))
      })
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

结果:

第一次执行代码

file

打印日志

file

第二次执行代码:

发现上次的数据并没有加载进来,说明只接收了socket流,并没有加载checkpoint的数据。

file

21.4.3 streaming用checkpoint恢复历史数据

通过StreamingContext.getOrCreate()

该方法优先使用checkpoint 检查点的数据创建StreamingContext;如果checkpoint没有数据,则将通过调用提供的“ creatingFunc”来创建StreamingContext。

file

代码:

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * calculate with state
 * keep all result
 */
object TestUpdateStateByKeyWithRecovery {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("updateStateByKey")
    conf.setMaster("local[*]")
    val ssc = StreamingContext.getOrCreate("file:///headless/workspace/spark/data/ckpt",()=>{

      val ssc = new StreamingContext(conf, Seconds(5))
      ssc.checkpoint("file:///headless/workspace/spark/data/ckpt")
      ssc.socketTextStream("nn1", 6666)
        .flatMap(_.split(" "))
        .map((_, 1))
        //      .reduceByKey(_+_)
        .updateStateByKey((curr: Seq[Int], last: Option[Int]) => {
          Option(curr.sum + last.getOrElse(0))
        })
        .print()
      ssc
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

删除checkpoint 目录,第一次启动程序

运行日志:

file

第二次启动程序

运行日志:

file

说明先加载checkpoint的数据,然后接收的socket 的流数据。

21.4.4 updateStateByKey只使用最近更新的值

背景:

把流式数据每批次计算的结果持久到MySQL数据库。

用 updateStateByKey,会保留之前批次的数据,更新时,如果每次都要把所有 单词 做更新,效率太低。

如何能只将当前批次的数据做更新,这就需要 批次数据中带有状态,来区分是本次更新的数据还是以前的数据。

代码:

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * calculate with state
 * keep all result
 */
object TestUpdateStateByKeyWithRecoveryAndCheck {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("updateStateByKey")
    conf.setMaster("local[*]")
    val ssc = StreamingContext.getOrCreate("file:///headless/workspace/spark/data/ckpt",()=>{

      val ssc = new StreamingContext(conf, Seconds(5))
      ssc.checkpoint("file:///headless/workspace/spark/data/ckpt")
      ssc.socketTextStream("nn1", 6666)
        .flatMap(_.split(" "))
        .map((_, IsUpdateVo(1,false)))
        //      .reduceByKey(_+_)
        .updateStateByKey((curr: Seq[IsUpdateVo], last: Option[IsUpdateVo]) => {
          val currnumber = curr.map(_.number).sum
          val lastnumber = if(last.isDefined) last.get.number else 0
          val result = if(currnumber>0) IsUpdateVo(currnumber+lastnumber,true) else IsUpdateVo(currnumber+lastnumber,false)
          Option(result)
        })
        .print()
      ssc
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
case class IsUpdateVo(var number:Int,var update:Boolean)

批次数据:

file

运行结果:

file

21.4.5 window 操作

sparkStreaming 支持 window 操作,当你需要跨批次去处理时就可以用,比如:统计过去10分钟的数据做均值、top(热词、热搜等)。

file

file

转换 描述
window(windowLength, slideInterval) 返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval) 返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval) 基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) 基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。可以进行repartition操作。
reduceByKeyAndWindow(func,invFunc,windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作(func),并对离开窗口的老数据进行“逆向reduce” 操作(invFunc)。但是,只能用于“可逆的reduce函数”必须启用“检查点”才能使用此操作
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

21.4.5.1 window函数的使用

它是将多个批次的数据结果进行封装,变成一个整体进行执行,其实就是扩大了计算范围

window中的参数,如果设定了窗口大小,默认窗口滑动大小就是批次大小

整体代码如下:

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("window")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.socketTextStream("nn1",6666)
      .window(Seconds(20))
      //default sliding = batch-size
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

file

数据的变化逻辑

file

window函数中可以设置窗口大小和滑动大小

但是窗口的大小或者是滑动大小必须是批次间隔的整数倍

window(窗口大小,滑动大小)

object TestWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("window")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.socketTextStream("nn1",6666)
      .window(Seconds(20),Seconds(12))
      //default sliding = batch-size
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

21.4.5.2 reduceByKeyAndWindow函数的使用

window函数就是将数据进行累加,然后计算其中的结果数据

window进行累加+ reduceByKey进行聚合

简便的写法 reduceByKeyAndWindow()

      //      .window(Seconds(20))
//      .reduceByKey(_+_)
      .reduceByKeyAndWindow((a:Int,b:Int)=> a+b,Seconds(20),Seconds(10))

整体代码如下:

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("window")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("file:///headless/workspace/spark/data/ckpt")
    ssc.socketTextStream("nn1",6666)
//      .window(Seconds(20),Seconds(12))
      //default sliding = batch-size
      .flatMap(_.split(" "))
      .map((_,1))
//      .window(Seconds(20))
//      .reduceByKey(_+_)
      .reduceByKeyAndWindow((a:Int,b:Int)=> a+b,(a:Int,b:Int)=>a-b,Seconds(20),Seconds(10))
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

file

21.4.5.3 其他的window算子

整体代码

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("window")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("file:///headless/workspace/spark/data/ckpt")
    val ds = ssc.socketTextStream("nn1", 6666)
      .flatMap(_.split(" "))

//    ds.window(Seconds(10)).count()
    ds.countByWindow(Seconds(10),Seconds(5)).print()

//    ds.window(Seconds(10)).countByValue()
    ds.countByValueAndWindow(Seconds(10),Seconds(5)).print()

//    ds.window(Seconds(10)).reduce((a,b)=> a+"-"+b)
    ds.reduceByWindow((a,b)=> a+b,Seconds(10),Seconds(5)).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

21.4.5 SparkStreaming 何时使用缓存?何时开启检查点?

分析sparkStreaming 什么时候使用缓存?

1)DStream 和 RDD相似,如果DStream中的数据将被多次计算(例如,对同一数据进行多次操作),这将很有用。可以调用 cache()或 persist() 方法缓存。

2)对于基于窗口的操作reduceByWindow和 reduceByKeyAndWindow和基于状态的操作updateStateByKey,由于窗口的操作生成的DStream会自动保存在内存中,而无需开发人员调用persist()。

分析 sparkStreaming 什么时候开启检查点checkpoint?

1)有状态转换的用法 -如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow(带有反函数),则必须提供checkpoint目录以允许定期进行RDD的checkpoint。

2)从运行应用程序的驱动程序故障中恢复,checkpoint用于恢复进度信息。

包括:

配置:用于创建流应用程序的配置。

DStream 操作:定义流应用程序的一组 DStream 操作。

不完整的批次:作业已排队但尚未完成的批次。

21.4.6 多receiver源union的方式

当数据源多时,有多个数据源就有多个DStream,每个DStream生成自己的任务,为了提高运行效率,可以将多个数据源的流数据union在一起,进而达到减少task的目的。

每个socket 源,都需要有一个receiver接收数据,一个receiver 需要一个CPU核,运行的时候还需要CPU核。

也可以通过本地修改CPU核数,看是否能运行任务

日志提示,且没有CPU核运行任务:

file

通过 StreamingContext 的 union 方法把多个receiver 源 union到一起。

file

首先准备nc在nn2机器上

yum -y install nc
nc -l -k -p 6666

1)先union,再处理

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("window")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("file:///headless/workspace/spark/data/ckpt")
    val ds1 = ssc.socketTextStream("nn1", 6666)
    val ds2 = ssc.socketTextStream("nn2", 6666)
    //union
    val ds3 = ds1.union(ds2)
    ds3.flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

file

2)先加工,再union,然后再处理

package com.hainiu.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TestWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("window")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("file:///headless/workspace/spark/data/ckpt")
    val ds1 = ssc.socketTextStream("nn1", 6666)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
    val ds2 = ssc.socketTextStream("nn2", 6666)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
    //union
    ds1.union(ds2).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

file

21.4.7 SparkStreaming输出到HDFS

file

socket 源如何分区?

当从socket 源接收数据时,receiver 会创建数据块。每blockInterval毫秒生成一个新的数据块。在batchInterval期间创建了N个数据块,其中N = batchInterval / blockInterval。

blockInterval 默认是200ms,如果 batchInterval 设置为 2s,理论这个批次会产生 10个分区。如果某个blockInterval 时间内没有数据,则这个blockInterval 时间 就没有产生数据块。也就是说,开启socket源的sparkStreaming程序,如果socket端不喂数据,那这个批次就不会产生数据块,进而分区数是0。

file

小文件就会带来一系列的问题:

小文件多会占用很大的namenode的元数据空间,下游使用小文件的JOB会产生很多个partition,如果是mr任务就会有很多个map,如果是spark任务就会有很多个task。

如何解决写出小文件问题?

4种方法:

1)增加批次间隔的大小。(不建议使用)

失去了流式计算的意义。

2)使用批处理任务进行小文件的合并。(不建议使用)

需要新开个任务将多个小文件合并成大文件。

3)使用coalesce 减少分区数,进而减少输出小文件的个数。

4)使用HDFS的append方式,追加写入文件中。

file

package com.hainiu.spark

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.io.PrintWriter
import java.text.SimpleDateFormat
import java.util.Date

object Test {
  def main(args: Array[String]): Unit = {
    val parent_path = "/spark_sink/"
    val file_name = "t.txt"
    val conf = new SparkConf()
    conf.setAppName("test sink")
    conf.setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(20))
    ssc.socketTextStream("nn1",6666)
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .foreachRDD(rdd=>{

        rdd.coalesce(2).mapPartitionsWithIndex((index,it)=>{
          val fs = FileSystem.get(new Configuration())
          val df = new SimpleDateFormat("yyyy/MM/dd/HH")
          val date_str = df.format(new Date())
          val all_path = parent_path+date_str+"/"+index+"_"+file_name
          var stream: FSDataOutputStream = null
          if(fs.exists(new Path(all_path))){
            stream = fs.append(new Path(all_path))
          }else{
            stream = fs.create(new Path(all_path))
          }
          val pw = new PrintWriter(stream,true)
          it.foreach(t=>{
            val line = t._1+"->"+t._2
            pw.println(line)
          })
          pw.close()
          fs.close()
          Iterator.empty
        }).foreach((t:String)=>{})

      })

    ssc.start()
    ssc.awaitTermination()
  }
}

代码执行

file

请注意:我们需要调节hdfs的链接的个数,也就是调小分区个数,增加批次间隔,不然会出现连接hdfs的时候因为连接数量过大产生问题,这个是spark的自动反压解决不了的,会出现很多的任务在队列中没办法执行

此段代码的注意点:

  • mapParittionsWithIndex算子实现分区创建链接并且获取分区下标作为文件名

  • 使用日期进行格式化替代路径一个小时写入一个文件夹中
  • 如果存在就追加不存在就创建
  • 使用转换类算子需要返回数据
  • 转换类算子需要触发运算执行

最根本的思想就是取出DStream中的RDD然后使用最原始的方式存储数据到外部

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-野牛,http://hainiubl.com/topics/76291
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter