IT星球论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 53|回复: 0

Spark 入门

[复制链接]

1996

主题

1

好友

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

优秀会员 助人为乐 辛勤工作 技术精英 多才多艺 优秀班竹 灌水天才 星球管理 宣传大使 灌水之王 财富勋章 版主勋章 动漫勋章 勤奋会员 论坛精英 PS高手 心 8 闪游皮肤 双鱼座 8★8➹ 志愿者 乖

发表于 2017-3-22 12:32:04 |显示全部楼层
Spark 入门

Spark 入门

目录

一、 Spark功能和优势

1. Spark功能

2. Spark处理数据分三步走

3. Spark优势

二、 Spark与MapReduce的比较

三、 Spark源码编译

1、 下载Spark源码

2、 编译要求

3、 注意事项

(1) 修改make-distribution.sh文件

(2) 配置镜像

(3) 修改下域名解析服务器配置

4、 mvn编译

5、 make-distribution进行打包编译

四、 安装Spark

1、 解压spark tar包

2、 检查环境

3、 配置spark-env.sh

4、 启动Spark

5、 Web页面

五、 SparkShell使用

1、 准备数据

2、 测试

六、 运行WordCountDemo

1、 读取数据

2、 处理数据

3、 保存数据

七、 SparkTopKey Demo

八、 RDD理解

1、 定义

2、 操作类型




一、 Spark功能和优势

1.Spark功能

Spark类似于MapReduce,是另一种分布式计算框架,由于MapReduce最大的痛点在于IO,包括硬盘IO和网络IO都成了限制计算的瓶颈,Spark是使用内存来极端,所以Spark是一种内存计算框架。将中间解决存入内存中,大大提高了计算的速度。不同于MapReduce只有map和reduce,Spark提供了上百种操作,功能强大。

2.Spark处理数据分三步走

² 读取数据:读取数据一般是从HDFS上读取数,如sc.textfile(‘/user/input’)

对于SparkCore来说,将数据变为RDD。

对于SparkSql来说,是将数据变为DataFrame

对于Streaming来说,将数据变为DStream

² 处理数据

对于SparkCore来说,调用RDD的一系列方法。

对于SparkSql来说,是调用df的一系列方法

对于Streaming来说,是调用dstream一系列方法

这些方法大部分是高阶函数。使用各种方法来在内存中处理数据。

² 输出数据:输出数据也大部分是存入硬盘,

sc.SaveAsTextFile

resultDF.write.jdbc()

resultDStream.foreach(RedisHBase)

3.Spark优势

Spark是对于海量数据的快速通用引擎。它的优势如下:

(1)快

Spark运行快的原因一是因为运行过程中将中间结果存入内存,二是因为Spark运行前会将运行过程生成一张DAG图(有向无环图)。

当处理的源数据在文件中时,比hadoop快10倍,当处理的源数据在内存中时,比Hadoop快100倍。

(2)通用

可以使用Core/SQL/Streaming/Graphx/MLib/R/StructStreaming(2.0)等进行Spark计算。

处理的数据通用:可以处理HDFS/Hive/HBase/ES、JSON/JDBC等数据

Spark运行模式:Spark可以运行在本地模式、集群模式,集群模式时,可以运行在YARN上、Mesos上、Standalone集群上、云端

(3)使用简单

可以使用Python、Scala、Java等开发。

二、 Spark与MapReduce的比较

     
      

MapReduce

      

Spark

        

数据存储结构

      

磁盘HDFS文件系统

      

使用内存构建弹性分布式数据集RDD对数据进行运算和缓存

        

编程范式

      

Map+Reduce

      

DAG(有向无环图):Transformation+action

        

中间结果存储

      

中间结果落地磁盘,IO及序列化反序列化代价比较大

      

中间结果存储在内存中,速度比磁盘多几个数量级

        

运行方式

      

Task以进程方式维护,任务启动慢

      

Task以线程方式维护,任务启动快

   

三、 Spark源码编译

1、 下载Spark源码

Spark源码下载:http://spark.apache.org/downloads.html

我们这里选择Spark1.6.1的源码进行编译。

2、 编译要求

Spark编译官方文档地址为:http://spark.apache.org/docs/1.6.1/building-spark.html

Spark源码编译有三种方式:SBT编译、Maven编译、打包编译

官方文档上讲到Spark1.6.1编译要求Maven版本最低是3.3.3,Java版本最低是7。

3、 注意事项

(1)修改make-distribution.sh文件

make-distribution.sh在源码的根目录下,脚本里有动态查找Spark版本、Scala版本、Hadoop版本、Hive版本的代码,如果编译时去计算会很慢,可以直接将版本写死,可以提高编译速度。

     

原来的配置:

  

VERSION=$("$MVN"  help:evaluate -Dexpression=project.version $@ 2>/dev/null | grep -v  "INFO" | tail -n 1)

  

SCALA_VERSION=$("$MVN"  help:evaluate -Dexpression=scala.binary.version $@ 2>/dev/null

  

|  grep -v "INFO"

  

|  tail -n 1)

  

SPARK_HADOOP_VERSION=$("$MVN"  help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null

  

|  grep -v "INFO"

  

|  tail -n 1)

  

SPARK_HIVE=$("$MVN"  help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@  2>/dev/null

  

|  grep -v "INFO"

  

|  fgrep --count "hive";

  

#  Reset exit status to 0, otherwise the script stops here if the last grep  finds nothing

  

#  because we use "set -o pipefail"

  

  echo -n)

        

修改为:

  

VERSION=1.6.1

  

SCALA_VERSION=2.10.4

  

SPARK_HADOOP_VERSION=2.5.0-cdh5.3.6

  

SPARK_HIVE=1

   

这里版本号一定要跟实际的情况一致。

VERSION是Spark的版本号

SPARK_HIVE为1是支持Hive,0是不支持hive

(2)配置镜像

[hadoop@spark01-61cdh apache-maven-3.3.3]$vim /opt/modules/apache-maven-3.3.3/conf/settings.xml

     
  

  mirrorId

  

  repositoryId

  

  Human Readable Name for this Mirror.

  

  http://my.repository.com/repo/path

  

  

   

(3)修改下域名解析服务器配置

[hadoop@spark01-61cdh apache-maven-3.3.3]$sudo vim /etc/resolv.conf

     

nameserver 8.8.8.8

  

nameserver 8.8.4.4

   

4、 mvn编译

[hadoop@spark01-61cdh spark-1.6.1]$ mvnclean package -DskipTest -Phadoop-2.5 -Dhadoop.version=2.5.0 -Pyarn -Phive -Phive-thriftserver-Dmaven.test.skip=true -Dmaven.test.skip=true -e

² OutOfMemoryError错误

参考:https://cwiki.apache.org/conflue ... EN/OutOfMemoryError

是因为JVM的可用内存太少,需要手动调整Meven的JVM可用内存量。

配置环境变量:exportMAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=1024m"

² MojoFailureException错误

5、 make-distribution进行打包编译

(1)命令参数说明

./make-distribution.sh --name custom-spark--tgz -Psparkr -Phadoop-2.5-Phive -Phive-thriftserver –Pyarn

² name参数是指编译完成后tar包的名称,比如spark-1.6.1-bin-2.5.0.tar.gz

² -Phadoop-2.5是指使用hadoop2.5版本

² -Phive是指定spark支持hive

² -Phive-thriftserver是指定支持hive-thriftserver

² -Pyarn是指定支持yarn

四、 安装Spark

1、 解压spark tar包

[hadoop@spark01-61cdh software]$ tar -zxf/opt/software/spark-1.6.1-bin-2.5.0-cdh5.3.6.tgz -C /opt/modules

2、 检查环境

     

检查Java是否已经安装好

  

[hadoop@spark01-61cdh  spark-1.6.1-bin-2.5.0-cdh5.3.6]$ java -version

  

java version  "1.7.0_67"

  

Java(TM) SE Runtime  Environment (build 1.7.0_67-b01)

  

Java HotSpot(TM)  64-Bit Server VM (build 24.65-b04, mixed mode)

        

检查Scala是否已经安装好

  

[hadoop@spark01-61cdh  spark-1.6.1-bin-2.5.0-cdh5.3.6]$ scala -version

  

Scala code runner  version 2.10.4 -- Copyright 2002-2013, LAMP/EPFL

        
   

3、 配置spark-env.sh

从模板复制一个配置文件

[hadoop@spark01-61cdhspark-1.6.1-bin-2.5.0-cdh5.3.6]$ cp conf/spark-env.sh.templateconf/spark-env.sh

在spark-env.sh添加配置:

     

JAVA_HOME=/opt/modules/jdk1.7.0_67

  

SCALA_HOME=/opt/modules/scala-2.10.4/bin

  

HADOOP_CONF_DIR=/opt/modules/hadoop-2.5.0-cdh5.3.6/etc/hadoop

   

4、 启动Spark

[hadoop@spark01-61cdhspark-1.6.1-bin-2.5.0-cdh5.3.6]$ bin/spark-shell

这样就进入到了Spark的shell交互式命令行。

这里请注意,在启动时的日志里有一句:

     

16/10/12 23:40:36  INFO repl.SparkILoop: Created spark context..

  

Spark context  available as sc.

   

这里意思是spark会创建一个context对象叫做sc。这个sc是SparkContext,它是SparkCore的程序入口,SparkContext会创建一个RDD。

5、 Web页面

启动shell后,可用通过4040端口的Web页面查看监控页面。

五、 Spark Shell使用

1、 准备数据

将spark根目录下的README.md文件上传到HDFS上去

[hadoop@spark01-61cdhspark-1.6.1-bin-2.5.0-cdh5.3.6]$ hdfs dfs -put README.md /

2、 测试

加载文件到rdd:scala> valrdd=sc.textFile("/README.md")

计算多少行:scala>rdd.count

计算包含Spark关键字的行数:scala>rdd.filter(line=>line.contains("Spark")).count

取前5行数据:scala> rdd.take(5)

六、 运行WordCount Demo

按照大数据处理三步走:

1、 读取数据

scala> valrdd=sc.textFile("/input.txt")

这个sc是一个SparkContext对象,textFile方法是读取HDFS上的文件,读取文件后,赋值给一个RDD对象。后续的操作都是用户RDD来操作的。

2、 处理数据

² scala>var wordcountRdd=rdd.flatMap(line => line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>(a+b))

flatMap是将文件中的内容根据空格分隔开后,变换为一个单词数组。

map是针对每一个单词生成一个键值对,键为单词,值为1。

reduceByKey是将每一个键值对的值根据key进行合并相加,来统计各个单词的个数。

² scala>wordcountRdd.count

² scala>wordcountRdd.take(11)

res4: Array[(String, Int)] = Array((min,1),(hive,2), (word,1), (hua,2), (hello,1), (zhongh,1), (spark,2), (hadoop,2),(ren,2), (work,1), (storm,1))

3、 保存数据

scala>wordcountRdd.saveAsTextFile("/spark-out")

保存到HDFS根目录下的spark-out目录下

七、 Spark TopKey Demo

     

1、加载数据

  

scala> val  rdd=sc.textFile("/input.txt")

  
  

2、处理数据

  

scala>  var wordcountRdd=rdd.flatMap(line => line.split(" ")).map(word  =>(word,1)).reduceByKey((a,b)=>(a+b))

  
  

3、根据个数排序后,取前5个

  

scala>  wordcountRdd.map(tuple=>(tuple._2,tuple._1)).sortByKey(false).map(tuple=>(tuple._2,tuple._1)).take(5)

  
  

链式编程:

  

sc.textFile("/input.txt").  flatMap(line => line.split(" ")).map(word  =>(word,1)).reduceByKey((a,b)=>(a+b)).  map(tuple=>(tuple._2,tuple._1)).sortByKey(false).map(tuple=>(tuple._2,tuple._1)).take(5)

   

八、 RDD理解

1、 定义

RDD是弹性分布式数据集(Resilient Distributed Dataset)的简称,其实就是分布式元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有的RDD、调用RDD操作进行求值。

2、 操作类型

RDD有两种类型的操作:Transformation操作、Action操作,Transformation操作和Action操作区别在于Spark计算RDD的方式不同。

² Transformation操作会由一个RDD生成另一个新的RDD,生成的新的RDD是惰性求值的,只有在Action操作时才会被计算。

² Action操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或者是把结果存储到外部存储系统中。



来自为知笔记(Wiz)Spark 入门

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册 新浪微博账号登陆

该会员没有填写今日想说内容.
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

回顶部