7.10KafakaUtils. createDstream方式
KafkaUtils.createDstream是通过Zookeeper连接Kafka,receivers接收器从Kafka中获取数据,并且所有receivers获取到的数据都会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据。
1. 导入依赖
# 添加Spark Streaming整合Kafka的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
<version>2.3.2</version>
</dependency>
2. 创建Scala类,实现词频统计
在spark_chapter07项目的/src/main/scala/cn.itcast.dstream目录下,创建一个名为“SparkStreaming_Kafka_createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。
3. 创建Topic,指定消息的类别
[root@hadoop01~]#kafka-topics.sh --create --topic kafka_spark –partitions 3
--replication-factor 1 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "kafka_spark".
4. 启动Kafka的消息生产者,并观察IDEA控制台输出
[root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop01:9092