简介

Spark课程实验案例:Spark+Kafka构建实时分析Dashboard

资料下载(百度云)

环境准备

软件版本

  • Vmware: 16.2.1
  • Ubuntu: 16.04
  • Hadoop: 2.7.1
  • Spark: 2.1.0
  • Scala: 2.11.8
  • kafka: 0.8.2.2, 0.10.2.0
  • python: 3.6

安装教程

系统环境

# 更新源 http://mirrors.aliyun.com/ubuntu/
sudo apt-get update
sudo apt-get install vim
sudo apt-get install openssh-server
# 添加用户
sudo useradd -m hadoop -s /bin/bash
sudo passwd hadoop

配置固定ip: vim /etc/network/interfaces

auto lo
iface lo inet loopback
auto ens33
iface ens33 inet static
address 192.168.41.129
netmask 255.255.255.0
gateway 192.168.41.2
dns-nameservers 8.8.8.8 114.114.114.114

image-1662824526163

重启生效:

sudo /etc/init.d/networking restart
ifconfig

环境变量:vi /etc/profile

export JAVA_HOME=/usr/local/java
export SCALA_HOME=/usr/local/scala
export MAVEN_HOME=/usr/local/maven
export GRADLE_HOME=/usr/local/gradle
export CLASSPATH=.:${JAVA_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH:$MAVEN_HOME/bin:$SCALA_HOME/bin:$GRADLE_HOME/bin

Hadoop

# 免密钥
ssh-keygen -t rsa  
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
ssh localhost pwd
# 开始安装
tar -zxvf hadoop-2.7.1.tar.gz  -C /usr/local/
cd /usr/local/
ln -snf hadoop-2.7.1 hadoop
chown -R hadoop:hadoop hadoop*

修改环境变量:vi ~/.bashrc

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export SPARK_HOME=/usr/local/spark
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SPARK_HOME/bin:$KAFKA_HOME/bin

修改配置文件
vi /usr/local/hadoop/etc/hadoop/core-site.xml

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/usr/local/hadoop/tmp</value>
        <description>Abase for other temporary directories.</description>
    </property>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

vi /usr/local/hadoop/etc/hadoop/hdfs-site.xml

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/usr/local/hadoop/tmp/dfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/usr/local/hadoop/tmp/dfs/data</value>
    </property>
</configuration>

启动:

hadoop namenode -format
start-dfs.sh
# 测试
hdfs dfs -mkdir -p /user/hadoop/input
hdfs dfs -put ${HADOOP_HOME}/etc/hadoop/*.xml /user/hadoop/input
hadoop jar ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep input output 'dfs[a-z.]+'
hdfs dfs -cat /user/hadoop/output/*

Kafka

# 开始安装
tar -zxvf kafka_2.11-0.10.2.0.tgz  -C /usr/local/
cd /usr/local/
ln -snf kafka_2.11-0.10.2.0 kafka
chown -R hadoop:hadoop kafka*
cd /usr/local/kafka
# 启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties 2>&1 > logs/zookeeper.out &
# 启动kafka
nohup bin/kafka-server-start.sh config/server.properties 2>&1 > logs/kafka.out &

常用操作

# 创建主题
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
# 列出所有主题
kafka-topics.sh --list --zookeeper localhost:2181  
# 控制台生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
# 控制台消费者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning 

Spark

# 开始安装
tar -zxvf spark-2.1.0-bin-without-hadoop.tgz  -C /usr/local/
cd /usr/local/
ln -snf spark-2.1.0-bin-without-hadoop spark
chown -R hadoop:hadoop spark*
# 依赖kafka时需要把相应的包移动到jars下面
cp spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars/

修改启动脚本:vi /usr/local/spark/conf/spark-env.sh

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
# 指定使用的python
export PYSPARK_PYTHON=/usr/bin/python3

Python

# 安装编译依赖
apt-get install zlib*
apt-get install openssl
apt-get install libssl-dev
# 下载并编译
wget http://npm.taobao.org/mirrors/python/3.6.14/Python-3.6.14.tgz
tar -zxvf Python-3.6.14.tgz
cd Python-3.6.14
# 编译添加 ssl 模块
./configure --prefix=/usr/local/python-3.6.14 --with-ssl
# 解决 ubuntu 下缺少 lsb_release  模块报错
cp -f /usr/lib/python3/dist-packages/lsb_release.py /usr/local/python-3.6.14/lib/python3.6/
make && make install
# 创建软链
cd /usr/local && ln -snf python-3.6.14/ python3.6 
# ubuntu下面进入 /usr/bin 修改系统默认软链
cd /usr/bin/
ln -snf /usr/local/python3.6/bin/python3 python3
ln -snf /usr/local/python3.6/bin/python3-config python3-config
ln -snf /usr/local/python3.6/bin/pip3 pip3
python3 --version && pip3 --version

image-1662823903757
python依赖

# 豆瓣源 -i http://pypi.douban.com/simple/ --trusted-host=pypi.douban.com
pip3 install Flask==2.0.3
pip3 install flask-socketio==4.3.1 
pip3 install python-engineio==3.13.2
pip3 install python-socketio==4.6.0
pip3 install kafka-python==2.0.2 

image-1662823921101

启动应用

流处理

源码:
工具类:StreamingExamples.scala

package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

处理类:KafkaWordCount.scala

package org.apache.spark.examples.streaming

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.json4s._
import org.json4s.jackson.Serialization.write
import java.util.HashMap

object KafkaWordCount {
  implicit val formats = DefaultFormats

  def main(args: Array[String]): Unit = {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Seconds(1), Seconds(1), 1).foreachRDD(rdd => {
      if (rdd.count != 0) {
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String, String](props)
        val str = write(rdd.collect)
        val message = new ProducerRecord[String, String]("result", null, str)
        producer.send(message)
      }
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

依赖文件pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.apache.spark.examples.streaming</groupId>
  <artifactId>simple-project_2.11</artifactId>
  <version>1.0</version>
  <packaging>jar</packaging>

  <name>simple-project_2.11</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>

  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.json4s</groupId>
      <artifactId>json4s-jackson_2.11</artifactId>
      <version>3.2.11</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
  <repositories>
    <repository>
      <id>public</id>
      <name>aliyun nexus</name>
      <url>https://maven.aliyun.com/repository/public</url>
      <releases>
        <enabled>true</enabled>
      </releases>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>public</id>
      <name>aliyun nexus</name>
      <url>https://maven.aliyun.com/repository/public</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </pluginRepository>
  </pluginRepositories>
</project>

启动实时流处理:

# 编译并启动
cd /usr/local/spark/mycode/kafkatest1
mvn clean install -DskipTests

/usr/local/spark/bin/spark-submit \
--driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* \
--class "org.apache.spark.examples.streaming.KafkaWordCount" \
/usr/local/spark/mycode/kafkatest1/target/simple-project_2.11-1.0.jar 127.0.0.1:2181 1 sex 1

生产者

cd /usr/local/spark/mycode/labproject/scripts
python3 producer.py 

源码:

# coding: utf-8
import csv
import time

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='192.168.41.129:9092')
csvfile = open("../data/user_log.csv", "r", encoding='utf-8')
reader = csv.reader(csvfile)

for line in reader:
  gender = line[9]
  if gender == 'gender':
    continue
  print(line[9])
  producer.send('sex', line[9].encode('utf8'))

消费者

cd /usr/local/spark/mycode/labproject/scripts
python3 consumer.py

源码:

from kafka import KafkaConsumer

consumer = KafkaConsumer('result', auto_offset_reset='earliest', bootstrap_servers='192.168.41.129:9092')
for msg in consumer:
  print((msg.value).decode('utf8'))

启动系统

# 启动spark-streaming程序
cd /usr/local/spark/mycode/kafkatest1
sh startup.sh

# 启动消费者
cd /usr/local/spark/mycode/labproject/scripts
python3 consumer.py

# 启动 flask 页面
cd /usr/local/spark/mycode/labproject
python3 app.py

# 启动生产者
cd /usr/local/spark/mycode/labproject/scripts
python3 producer.py

image-1662826063940

访问页面:http://127.0.0.1:5000

image-1662826107928

代码结构:
image-1662826372011

入口:app.py

import json

from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
consumer = KafkaConsumer('result', bootstrap_servers='192.168.41.129:9092')


def background_thread():
  girl = 0
  boy = 0
  for msg in consumer:
    data_json = msg.value.decode('utf8')
    data_list = json.loads(data_json)
    for data in data_list:
      if '0' in data.keys():
        girl = data['0']
      elif '1' in data.keys():
        boy = data['1']
      else:
        continue
    result = str(girl) + ',' + str(boy)
    print(result)
    socketio.emit('test_message', {'data': result})

@socketio.on('test_connect')
def connect(message):
  print(message)
  global thread
  if thread is None:
    thread = socketio.start_background_task(target=background_thread)
  socketio.emit('connected', {'data': 'Connected'})

@app.route("/")
def handle_mes():
  return render_template("index.html")

if __name__ == '__main__':
  socketio.run(app, debug=True)