1 Star 0 Fork 1

fastbird2016/spark-source-analysis

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
DStream.txt 9.84 KB
一键复制 编辑 原始数据 按行查看 历史
fastbird2016 提交于 8年前 . init ...
---
date: 2016-06-06 13:36
status: public
title: DStream
---
##DStream的类型
1. 按照批处理时间生成输入RDD的InputDStream
2. 执行转化和聚合操作的DStream
3. 执行输出的DStream
##DStream的职责
根据相应的操作生成相应的RDD,对于输入流DStream,compute方法的逻辑主要根据数据源生成初始RDD;对于转化和聚合DStream,compute主要逻辑根据父DStream中生成的RDD执行转化操作,这类DStream如下
```scala
class MappedDStream[T:ClassTag,U:ClassTag](parent:DStream[T],mapFunc:T=>U) extends DStream[U](parent.ssc)
class MapValuedDStream[K:ClassTag,V:ClassTag,U:ClassTag](parent:DStream[(K,V)],mapValueFunc:V=>U) extends DStream[(K,U)](parent.ssc)
class MapPartitionedDStream[T:ClassTag,U:ClassTag](parent:DStream[T],mapPartFunc:Iterator[T]=>Iterator[U],preservePartitioning:Boolean) extends DStream[U](parent.ssc)
class FlatMappedDStream[T:ClassTag,U:ClassTag](parent:DStream[T],flatMapFunc:T=>TraversableOnce[U]) extends DStream[U](parent.ssc)
class FlatMapValuedDStream[K:ClassTag,V:ClassTag,U:ClassTag](parent:DStream[(K,V)],flatMapValueFunc:V=>TraversableOnce[U]) extends DStream[(K,U)](parent.ssc)
class TransformedDStream[U:Classtag](parents:Seq[DStream[_]],transformFunc:(Seq[RDD[_],Time]=>RDD[U])) extends DStream[U](parents.head.ssc)
class FilteredDStream[T:ClassTag](parent:DStream[T],filterFunc:T=>Boolean) extends DStream[T](parent.ssc)
class GlommedDStream[T:ClassTag](parent:DStream[T]) extends DStream[Array[T]](parent.ssc)//将所有父RDD的分片合并为一个只有一个分片的RDD
class ShuffledDStream[K:ClassTag,V:ClassTag,C:ClassTag](parent:DStream[(K,V)],createCombiner:V=>C,mergeValue:(C,V)=>C,mergeCombiner:(C,C)=>C,partitioner:Partitioner:Partitioner,mapSideCombine:Boolean=true) extends DStream[(K,C)](parent.ssc)
class ReducedWindowedDStream[K:ClassTag,V:ClassTag](parent:DStream[(K,V)],reduceFunc:(V,V)=>V,invReduceFunc:(V,V)=>V,filterFunc:Option[(K,V)]=>Boolean,_windowDuration:Duration,_slideDuration:Duration,partitioner:Partitioner) extends DStream[(K,V)](parent.ssc){
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("ZeroTime = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _____________________________
// | previous window _________|___________________
// |___________________| current window | --------------> Time
// |_____________________________|
//
// |________ _________| |________ _________|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
// Get the RDD of the reduced value of the previous window
val previousWindowRDD =
getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
// Cogroup the reduced RDDs and merge the reduced values
val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
partitioner)
// val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
throw new Exception("Unexpected number of sequences of reduced values")
}
// Getting reduced values "old time steps" that will be removed from current window
val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
val newValues =
(1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
if (arrayOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
throw new Exception("Neither previous window has value for key, nor new values found. " +
"Are you sure your key class hashes consistently?")
}
// Reduce the new values
newValues.reduce(reduceF) // return
} else {
// Get the previous window's reduced value
var tempValue = arrayOfValues(0).head
// If old values exists, then inverse reduce then from previous value
if (!oldValues.isEmpty) {
tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
}
// If new values exists, then reduce them with previous value
if (!newValues.isEmpty) {
tempValue = reduceF(tempValue, newValues.reduce(reduceF))
}
tempValue // return
}
}
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
.mapValues(mergeValues)
if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))
} else {
Some(mergedValuesRDD)
}
}
}
class StateDStream[K:ClassTag,V:ClassTag,S:ClassTag](parent:DStream[(K,V)],updateFunc:(Iterator[(K,Seq[V],Option[S])])=>Iterator[(K,S)],partitioner:Partitioner,preservePartitioning:Boolean,initialRDD:Option[RDD[K,S]]) extends DStream[(K,S)](parent.ssc)
class MapWithStateDStreamImpl[KeyType:ClassTag,ValueType:ClassTage,StateType:ClassTag,MappedType:ClassTag](dataStream:DStream[(KeyType,ValueType)],spec:StateSpecImpl[KeyType,ValueType,StateType,MappedType]) extends DStream[MappedType](dataStream.context)
```
##数据输入流DStream
1. InputDStream 所有输入流的抽象,主要包含两个功能:在worker端创建Receiver接收数据和在Driver端可以根据批处理定时器定时将接收到的数据生成新的RDD(调用DStream.generateJobs(batchTime:Time)方法)
对于输入操作:def compute(validTime:Time):Option[RDD[T]]接口主要产生输入RDD
- FileInputDStream
监听hadoop HDFS中特定文件夹,按照批处理定时器定时获取文件夹中当前时间片中所有新产生的文件,并创建相应数量的NewHadoopRDD(一个文件对应一个RDD)
```scala
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
// Find new files
val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
batchTimeToSelectedFiles += ((validTime, newFiles))
recentlySelectedFiles ++= newFiles
val rdds = Some(filesToRDD(newFiles))
// Copy newFiles to immutable.List to prevent from being modified by the user
val metadata = Map(
"files" -> newFiles.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
val inputInfo = StreamInputInfo(id, 0, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
rdds
}
```
- ReceiverInputDStream
所有流式接收数据的DStream的抽象,需要定义新的输入流时需要实现***getReceiver():Receiver***方法定义在Worker端流式接收数据的Receiver。根据Driver端ReceivedBlockTracker中已经收到的各Worker端Receiver已经接收并已转化为Block存储到Worker端BlockManager中的Block信息生成BlockRDD或者WriteAheadLogBackedBlockRDD
```scala
/**
* Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
```
ReceiverInputDStream中实现类中创建的Receiver主要在StreamingContext.start()执行之后,通过组件ReceiverTracker创建Spark Job在Worker端执行获取数据操作。
>Receiver.onStart()方法中不能阻塞,此方法在Worker端中ReceiverSupervisorImpl初始时调用。可以这样理解ReceiverSupervisorImpl是在Worker管理Receiver的生命周期,Receiver中定义一个持续获取数据源数据的流式服务。在Receiver.onStart()方法中定义初始化启动服务需要的资源,并在子线程启动服务。在Receiver.onStop()方法中清理服务使用的资源。ReceiverSupervisorImpl中使用CountDownLatch(1)来阻塞任务结束实现服务的长时间运行。
>
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Scala
1
https://gitee.com/fastbird2016/spark-source-analysis.git
git@gitee.com:fastbird2016/spark-source-analysis.git
fastbird2016
spark-source-analysis
spark-source-analysis
master

搜索帮助