大数据实时计算系统实践

简介大数据实时计算系统实践

实时计算

简介

随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架 MapReduce 已经不能满足业务,大量的场景需要实时的数据处理结果来进行分析,决策。例如实时的用户推荐,在 618 这样的刺激环境下普通历史数据的推荐已经不能满足场景,就需要采集前分钟,甚至式前几秒的数据进行分析。实时计算适用于这种对历史数据依赖不强,短时间内变化较大的数据。用户行为分析,舆情分析,等等不断随环境和时间实时变化的数据都可能用到实时计算。

流程

实时的数据源:微博,微信,股票交易,银行流水,商城交易, 日志等数据。

消息中间件:Kafka,作为消息队列,提供数据的缓冲功能,同时也提供容错机制。

实时处理框架 (SparkStreaming):通过编写的 app 应用来拉取消息中间件的数据进行分布式的并行计算,处理和输出。

实时计算一定是基于分布式的并行计算框架的,单机对于短时间的高数据量远远达不到实时处理。

SparkStreaming

简介

SparkStreaming 是 Spark 提供的分布式的大数据实时计算框架,是基于 SparkCore(Spark 核心 API) 的扩展,他提供了动态的,高吞吐量的,可容错的流式数据处理。他可以从多个数据 Kafka,Flume,Kinesis,Twitter,Tcp scokets 中获取数据,然后使用复杂的算法和高级的函数算子如:map,reduce,join,window... 进行数据处理加工。最后可以将处理后的数据输出到文件系统,数据库,和可视化界面,同样也可以在数据流上使用机器学习和图形计算算法。
SparkStreaming 同 sparksql 一样在核心 RDD 上封装一种数据集DStream, 用于适应实时计算的特点, 类似于 sparksql 的 Dataset 和 DataFrame 用于方便交互式查询操作。
9add1de69872485caa9a92d17ddc8741-image.png

原理

接收实时的输入数据流,将数据拆分成多个 batch, 如每一秒的数据封装成一个 batch, 将每个 batch 交给 Spark 进行处理。最后将结果输出,同样的输出也是按 batch 来进行划分。
345810b0db7e4406be1727635e17174b-image.png

示例 SparkStreaming 的工作

StreamingContext

1. 用 conf 对象初始化 Streaming;

//scala
import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val sc   = new StreamingContext(conf,Seconds(1))

//scalaimport org.apache.spark._import org.apache.spark.streaming._val conf = newSparkConf().setAppName(appName).setMaster(master)val sc = new StreamingContext(conf,Seconds(1))

//java
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

//javaimport org.apache.spark.*;import org.apache.spark.streaming.api.java.*;SparkConf conf = newSparkConf().setAppName(appName).setMaster(master);JavaStreamingContext ssc = new JavaStreamingContext(conf, newDuration(1000));

2. 用 context 对象初始化

//scala
import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

//scalaimport org.apache.spark.streaming._val sc = ... // existing SparkContextval ssc = new StreamingContext(sc, Seconds(1))

//java
import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

//javaimport org.apache.spark.streaming.api.java.*;JavaSparkContext sc = ... //existing JavaSparkContextJavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

appName,是用来在 Spark UI 上显示的应用名称。master,是一个 Spark、Mesos 或者 Yarn 集群的 URL,或者是 local[*] 运行在本地模式 (用于本地测试和单元测试)。
必须根据应用程序的延迟需求和可用的集群资源来设置批处理间隔batch interval

Discretized Streams(DStream)

Spark Streaming 提供了一种高级的抽象,叫做 DStream,英文全称为 Discretized Stream,中文翻译为“离散流”,它代表了一个持续不断的数据流。DStream 可以通过输入数据源来创建,比如 Kafka、Flume 和 Kinesis, 也可以通过对其他 DStream 应用高阶函数来创建,比如 map、reduce、join、window。!
DStream 的内部,其实一系列持续不断产生的 RDD。RDD 是 Spark Core 的核心抽象,即不可变的,分布式的弹性性数据集。DStream 中的每个 RDD 都包含了一个时间段内的数据。
ea19380297544753860c68e24c49939b-image.png
对 DStream 应用的算子,比如 flatmap,其实在底层会被翻译为对 DStream 中每个 RDD 的操作。比如对一个 DStream 执行一个 flatmap 操作,会产生一个新的 DStream。但是,在底层其原理为,对输入 DStream 中每个时间段的 RDD,都应用一遍 flatmap 操作,然后生成的新的 RDD,即作为新的 DStream 中的那个时间段的一个 RDD。底层的 RDD 的 transformation 操作,其实,还是由 Spark Core 的计算引擎来实现的。Spark Streaming 对 Spark Core 进行了一层封装,隐藏了细节,然后对开发人员提供了方便易用的高层次的 API。
b781236cf5264cc5a6ded56b512d5476-image.png

实时的 WordCount 应用

我们通过经典的 wordcount 来初步的了解 SparkStreaming 是如何进行实时的流处理的。

//基本数据源:
//Tcp Socket: `socketTextStream`
//file,Hdfs: `textFileStream("hdfs://...")`
private static  void ssBSDataSource() throws InterruptedException{
  SparkConf conf = new SparkConf().setAppName("ssBSDataSource").setMaster("local[2]");
  //创建JavaStreamingContext,设置时间延迟为1秒(每次收集前一秒的数据)
  JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));

  /*jsc.textFileStream("input/streaming");//file
    jsc.textFileStream("hdfs://...");     //hdfs 
  */  
  //从Socket中获取数据,监听端口9999,的得到的是ReceiverInputDStream
  JavaReceiverInputDStream lines =   jsc.socketTextStream("localhost",9999);
    //接下来就是wordcount的操作了。
  JavaDStream listDstream =  lines.flatMap(line-> Arrays.asList(line.split(" ")).iterator());
  JavaPairDStream pairDStream =  listDstream.mapToPair(x->new Tuple2<>(x,1));
  JavaPairDStream wordCount =  pairDStream.reduceByKey((x1,x2)->(x1+x2));

  wordCount.print();//默认输出前10条数据
  jsc.start();
  jsc.awaitTermination();//不断的不等待一段时间间隔进行一次执行。

  //直到我们使用jsc.close();
}

//基本数据源://Tcp Socket: `socketTextStream`//file,Hdfs: `textFileStream("hdfs://...")`private static void ssBSDataSource() throwsInterruptedException{ SparkConf conf = new SparkConf().setAppName("ssBSDataSource").setMaster("local[2]"); //创建JavaStreamingContext,设置时间延迟为1秒(每次收集前一秒的数据) JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1)); /*jsc.textFileStream("input/streaming");//file jsc.textFileStream("hdfs://..."); //hdfs */ //从Socket中获取数据,监听端口9999,的得到的是ReceiverInputDStream JavaReceiverInputDStream lines = jsc.socketTextStream("localhost",9999); //接下来就是wordcount的操作了。 JavaDStream listDstream = lines.flatMap(line-> Arrays.asList(line.split(" ")).iterator()); JavaPairDStream pairDStream = listDstream.mapToPair(x->new Tuple2<>(x,1)); JavaPairDStream wordCount = pairDStream.reduceByKey((x1,x2)->(x1+x2)); wordCount.print();//默认输出前10条数据 jsc.start(); jsc.awaitTermination();//不断的不等待一段时间间隔进行一次执行。 //直到我们使用jsc.close();}

创建StreamingContext->获取InputDStream->业务操作->输出->循环获取数据并操作,可见实时流处理相比普通的 RDD 操作并没有太多的不同之处,这里的 StreamingContext 相当于 SparkContext,SparkSession,DStream 相当与 RDD 和 Dataset, 是操作对象。一个不同是StreamingContext需要start()启动才能执行,另一个不同点在于 SparkStreaming 会不断的从端口获取实时数据,然后执行相同的操作。

SparkStreaming 的数据源

基础数据源: 文件 (file,HDFS),Socket,Akka Ator, 是内置支持的数据源。

Hdfs:JavaReceiverInputDStream lines=textFileStream("hdfs://spark1:9000/fileDir")
对于文件数据源的实时处理,会不断的去检查目录,一旦有移入或者重命名(只处理新加入目录和名称不同的新文件) 的文件就会进行数据的获取和处理,处理之后即使内容修改也不会再处理。需要强调的是数据格式必须一致。

高级数据源:Kafka,Flume,Kinesis,Twitter, 可以引用第三方依赖调用。

Kafka: 需要配置依赖, 或者直接在 jar 放到 Spark 的 jars 中一同加入 Library.Version=SparkVersion

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.1.1</version></dependency>

Flume 同 Kafka 配置:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.10</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId><version>2.1.1</version></dependency>

依赖可以去Maven repository查询。具体应用后面再讲。

自定义数据源:以自定义数据源,来决定如何接受和存储。

DStream 和 Receiver

  • InputDStream 代表从数据源接收到的输入 DStream,在上面的实例中 lines 代表从 socket 中接收到的数据流的输入 DStream,除了文件数据流之外每个InputDstream都会绑定一个Receiver对象这个对象[Receiver]用于获取/接收数据并将其存储到Spark的内存中,以备后续使用。

  • 如果想要并行的接受多个数据流,我们可以创建多个 JavaReceierInputDStream, 这样每个 InputDStream 都会创建一个 Receiver,从而并行的接收多个数据流。但是 SparkStreamingAPP 在执行的时候是个持续工作的过程,Spark 的 Work/Executor 会独占一个 CPU Core, 所以说一旦 APP 运行,那么这个 CPU Core 就没法给其他 APP 使用。

    所以说我们要保证一个 APP 运行,在本地Local在起码保证要有大于2个线程【一个用于给处理 InputDStream 的 Executor 分配一个线程,一个用于 Receiver 接收数据。即一条接收数据,一条处理数据。】所以 setMaster("local[n]"),n 不能设置为 1,也不能直接 local,必须是要为 local[n] n>=2.
    在集群中 (不设置 Master) 必须要保证单个节点 Cpu Core>1,然后给每个 Executor 分配的 CPU Core 必须>1。这样才能保证才能保证 Executor 既可以执行 Receiver 数据的接收,又可以进行数据的处理。


因此,在我们配置 Spark 的时候必须给 Executor 配置 >1 的 CPU Core. 才能满足 SparkStreaming 的单任务执行。
特例:基于 Hdfs 文件的数据源是没有绑定 Receiver 的。因此不会占用一个 CPU Core.


本文转自:https://blog.51cto.com/14485508/2430870