7.10KafakaUtils. createDstream方式

KafkaUtils.createDstream是通过Zookeeper连接Kafka,receivers接收器从Kafka中获取数据,并且所有receivers获取到的数据都会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据。

1.  导入依赖

# 添加Spark Streaming整合Kafka的依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>

<version>2.3.2</version>

</dependency>

2.  创建Scala类,实现词频统计

在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下,创建一个名为“SparkStreaming_Kafka_createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。

3.  创建Topic,指定消息的类别

[root@hadoop01~]#kafka-topics.sh --create --topic kafka_spark –partitions 3 

--replication-factor 1 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Created topic "kafka_spark".

4.  启动Kafka的消息生产者,并观察IDEA控制台输出

[root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop01:9092

Last modified: Friday, 29 October 2021, 1:15 PM