[23章全]大数据硬核技能进阶 Spark3实战智能物业运营系统
参考资料1:https://pan.baidu.com/s/17TOzIVi-6yG24_QAmK60BQ 提取码: mxcp
参考资料2:https://share.weiyun.com/K1edg2Jy 密码:fcxf26
离线计算作为大数据计算领域领军技能,在成本、稳定性、数据一致性等方面有着绝对优势。吃透Spark离线技术及相关生态,就掌握了大数据工程师的高薪密码。本文章将结合生产级项目,一栈式点亮:数据收集(DataX)、数据湖(Iceberg)、数据分析(Spark)、智能调度(DS)、数据服务(DBApi)、AI大模型(ChatGPT)、可视化(Davinci)等离线处理核心技能及生态体系,带你打通硬核技能,拓宽上升通道。
首先,我们先来认识spark:
1、什么是spark
Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。
2、spark有什么用?
大数据处理和分析:Spark提供了高性能和可扩展的分布式计算能力,可以处理大规模的数据集。它支持批处理、实时流处理和交互式查询等多种数据处理模式,使得开发人员能够高效地处理和分析大数据。
数据转换和清洗:Spark提供丰富的数据处理操作和函数,如映射、过滤、聚合、排序等,使开发人员能够方便地对数据进行转换、清洗和预处理,以满足特定的数据需求。
机器学习和数据挖掘:Spark提供了机器学习库(如MLlib)和图计算库(如GraphX),支持在大规模数据集上进行机器学习和数据挖掘。开发人员可以使用Spark进行特征提取、模型训练和预测等任务。
实时流处理:Spark提供了Spark Streaming模块,支持实时数据流的处理和分析。开发人员可以使用Spark Streaming来处理实时数据流,如日志流、传感器数据流等,并进行实时计算、聚合和窗口操作等。
3、spark的使用场景有哪些
批处理:Spark可以处理大规模的数据集,并提供了丰富的数据处理和转换功能,适用于各种批处理任务,如数据清洗、ETL、数据分析等。
实时流处理:Spark的流处理模块Spark Streaming可以实时处理数据流,并提供了低延迟的处理能力,适用于实时推荐、实时分析、日志处理等应用场景。
机器学习:Spark提供了机器学习库MLlib,包括各种常用的机器学习算法和工具,可以在大规模数据上进行机器学习任务,如分类、回归、聚类、推荐等。
图计算:Spark的图计算库GraphX可以处理大规模图结构数据,并提供了各种图算法和操作,适用于社交网络分析、网络图谱等应用。
SQL查询:Spark支持使用SQL进行数据查询和分析,可以直接在Spark上运行SQL查询,与传统的关系型数据库类似,适用于数据分析和报表生成等任务。
分布式文件系统:Spark可以与分布式文件系统(如HDFS)集成,可以直接读取和处理分布式文件系统中的数据,适用于大规模数据集的处理和分析。
总的来说,Spark适用于大规模数据的处理和分析,支持多种类型的数据处理和计算任务,包括批处理、实时流处理、机器学习、图计算等领域。
4、Spark常用代码
①创建RDD方法
有两个不同的方式可以创建新的RDD
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("createWholeTextFile").setMaster("local[*]")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区
# 用于读取小文件并自动压缩分区
wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到2个分区
result = wholefile_rdd.take(1)
# print(result) # (location, value)的形式
# 获取前面的路径
path_list = wholefile_rdd.map(lambda x: x[0]).collect()
sc.stop()
②专门读取小文件wholeTextFiles
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("createWholeTextFile").setMaster("local[*]")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区
# 用于读取小文件并自动压缩分区
wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到2个分区
result = wholefile_rdd.take(1)
# print(result) # (location, value)的形式
# 获取前面的路径
path_list = wholefile_rdd.map(lambda x: x[0]).collect()
sc.stop()
③rdd的分区数
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
# spark入口申请资源
conf = SparkConf().setAppName("createRDD").setMaster("local[5]")
# 应该充分使用资源,线程数设置成CPU核心数的2-3倍
# conf.set("spark.default.parallelism", 10)
sc = SparkContext(conf=conf)
# 创建rdd的第一种方法
collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
print(collection_rdd.collect())
# 获取分区数
print("rdd number of partitions ", collection_rdd.getNumPartitions())
# 解释:
# 设置了5个核心,默认是5个分区,如果是local[*] 默认是2个分区
# conf.set("spark.default.parallelism", 10)优先使用此值
# 如果sc.parallelize也设置了分区,那么最优先使用api设置的分区数
# 如果是读取文件夹下面的文件,sc.textFile, minPartitions失效,有多少个文件就有多少个分区,下面100个文件返回了100个分区
file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100",
minPartitions=3)
print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区
# 用于读取小文件并自动压缩分区,minPartitions参数是生效的。
wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100",
minPartitions=3)
print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到3个分区
# 打印不同分区数据
collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6], numSlices=7)
print("collection_rdd number of partitions ", collection_rdd.getNumPartitions())
# 6个数据7个分区,有一个分区是空的 per partition content [[], [1], [2], [3], [4], [5], [6]]
print("per partition content", collection_rdd.glom().collect())
# 关闭spark context
sc.stop()
有疑问加站长微信联系(非本文作者)