搭建Spark计算平台+python操作Spark - 简书


本站和网页 https://www.jianshu.com/p/b999493f316a 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

搭建Spark计算平台+python操作Spark - 简书登录注册写文章首页下载APP会员IT技术搭建Spark计算平台+python操作SparkByte猫关注赞赏支持搭建Spark计算平台+python操作Spark一、Spark安装及服务启动
Apache Spark是一种快速的集群计算技术,专为快速计算而设计。它基于Hadoop MapReduce,它扩展了MapReduce模型,以有效地将其用于更多类型的计算,包括交互式查询和流处理。 Spark的主要特性是它的内存中集群计算,提高了应用程序的处理速度(Spark 因为 RDD 是基于内存的,可以比较容易切成较小的块来处理。如果能对这些小块处理得足够快,就能达到低延时的效果)。
比起 Hadoop MapReduce, Spark 本质上就是基于内存的更快的批处理,然后用足够快的批处理来实现各种场景
1、安装Scala
下载并解压Scala
cd /opt/scala
wget https://downloads.lightbend.com/scala/2.11.7/scala-2.11.7.tgz
tar -zxf scala-2.11.7.tgz
将Scala添加到环境变量
vi /etc/profile
在最后面添加
export SCALA_HOME=/opt/scala/scala-2.11.7
export PATH=$PATH:$SCALA_HOME/bin
激活配置
source /etc/profile
2、Spark下载
从官网下载和自己hadoop版本相匹配的spark安装包
3、解压安装文件并配置环境变量
(1)解压安装文件
解压安装文件到指定的的文件夹 /opt/spark
tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz -C opt/spark
修改文件夹名字
cd /opt/spark/
mv spark-2.2.0-bin-hadoop2.7 spark-2.2.0
(2)配置环境变量
export SPARK_HOME=/opt/spark/spark-2.2.0
export PATH=$SPARK_HOME/bin:$PATH
4、配置Spark
需要修改的配置文件有两个
spark-env.sh,spark-defaults.conf
(1)配置spark-env.sh
cd /opt/spark/spark-2.2.0/conf
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
配置如下内容
# 配置JAVA_HOME
export JAVA_HOME=/opt/java/jdk1.8.0_144
# 配置SCALA_HOME
export SCALA_HOME=/opt/scala/scala-2.11.7
# 配置HADOOP
export HADOOP_HOME=/opt/hadoop/hadoop-2.7.6/
export HADOOP_CONF_DIR=/opt/hadoop/hadoop-2.7.6/etc/hadoop
#定义管理端口
export SPARK_MASTER_WEBUI_PORT=8088
#定义master域名和端口
export SPARK_MASTER_HOST=spark-master
export SPARK_MASTER_PORT=7077 # 提交Application的端口
export SPARK_MASTER_IP=10.141.211.80
# 定义work节点的管理端口
export SPARK_WORKER_WEBUI_PORT=8088
# 每一个Worker最多可以使用的cpu core的个数,真实服务器如果有32个,可以设置为32个
export SPARK_WORKER_CORES=10
# 每一个Worker最多可以使用的内存,真实服务器如果有128G,你可以设置为100G
export SPARK_WORKER_MEMORY=4g
(2)配置spark-defaults.conf
cd /opt/spark/spark-2.2.0/conf
vim spark-defaults.conf
配置如下内容
spark.eventLog.enabled=true
spark.eventLog.compress=true
# 保存在本地
# spark.eventLog.dir=file://opt/hadoop/hadoop-2.7.6/logs/userlogs
# spark.history.fs.logDirectory=file://opt/hadoop/hadoop-2.7.6/logs/userlogs
# 保存在hdfs上
spark.eventLog.dir=hdfs://spark-master:9000/tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs://spark-master:9000/tmp/logs/root/logs
spark.yarn.historyServer.address=spark-master:18080
5、启动Spark
sbin/start-all.sh
二、PySpark安装
pyspark是用来对接 spark的 Python 库
pip install pyspark
三、使用pyspark
1、SparkContext声明
SparkContext是任何spark功能的入口点。
from pyspark import SparkContext
sc = SparkContext("local", "First App")
2、一些基本操作
(1)count
返回RDD中的元素个数
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
counts = words.count()
print("Number of elements in RDD -> %i" % counts)
(2)collect
返回RDD中的所有元素
from pyspark import SparkContext
sc = SparkContext("local", "collect app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"
])
coll = words.collect()
print("Elements in RDD -> %s" % coll)
(3)foreach
仅返回满足foreach内函数条件的元素。在下面的示例中,我们在foreach中调用print函数,该函数打印RDD中的所有元素。
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
def f(x): print(x)
fore = words.foreach(f)
(4)filter
返回一个包含元素的新RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含''spark'的字符串。
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))
(5)map
通过将该函数应用于RDD中的每个元素来返回新的RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值1
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))
(6)reduce
执行指定的可交换和关联二元操作后,将返回RDD中的元素。在下面的示例中,我们从运算符导入add包并将其应用于'num'以执行简单的加法运算。说白了和Python的reduce一样:假如有一组整数[x1,x2,x3],利用reduce执行加法操作add,对第一个元素执行add后,结果为sum=x1,然后再将sum和x2执行add,sum=x1+x2,最后再将x2和sum执行add,此时sum=x1+x2+x3。
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))
(7)join
它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值。
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))
官方文档
推荐阅读更多精彩内容Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames...Spark SQL, DataFrames and Datasets Guide Overview SQL Dat...草里有只羊阅读 17,783评论 0赞 85从0开始使用Docker搭建Spark集群最近在学习大数据技术,朋友叫我直接学习Spark,英雄不问出处,菜鸟不问对错,于是我就开始了Spark学习。 为什...Plokmijn阅读 24,789评论 6赞 26Hadoop+Hive+Spark平台搭建Mapreduce+Hive+Spark平台搭建 说明 平台搭建成功后,使用Scala语言进行算法设计和应用的开发...泽泽馥泽泽阅读 4,787评论 4赞 6spark 学习笔记Spark学习笔记 Data Source->Kafka->Spark Streaming->Parquet->S...哎哟喂喽阅读 6,342评论 0赞 51同行WITHUS(day5)有幸来参加“同行with us”的广州公开课,因为不专业,所以懵懂了一上午。临近中午散场的时候,找到刘总,想聊下心...笑笑一直在路上阅读 96评论 0赞 0这个萌炸了的欧包+茶就是汴京茶寮!汴京茶寮每批欧包上架,不用等太久,就会被一抢而空,周一至周五白天还能买得到,等到晚上或者周末,直接就是手慢无!甜口...汴京茶寮阅读 162评论 0赞 0逻辑清晰如李雨桐,为何还是躲不过薛之谦式渣男?先爱好自己,再去爱别人。 9月8号凌晨4点,薛之谦在微博高调宣布与“前妻”高磊鑫复合,同一时间,高在其微博与之遥相...杉和明阅读 443评论 0赞 1D19-时间模块1.时间模块 **2.dateTime模块____空白阅读 141评论 0赞 1评论0赞55赞6赞赞赏更多好文