[23章全]大数据硬核技能进阶 Spark3实战智能物业运营系统

kaudmands · · 327 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

[23章全]大数据硬核技能进阶 Spark3实战智能物业运营系统 参考资料1:https://pan.baidu.com/s/17TOzIVi-6yG24_QAmK60BQ 提取码: mxcp 参考资料2:https://share.weiyun.com/K1edg2Jy 密码:fcxf26 离线计算作为大数据计算领域领军技能,在成本、稳定性、数据一致性等方面有着绝对优势。吃透Spark离线技术及相关生态,就掌握了大数据工程师的高薪密码。本文章将结合生产级项目,一栈式点亮:数据收集(DataX)、数据湖(Iceberg)、数据分析(Spark)、智能调度(DS)、数据服务(DBApi)、AI大模型(ChatGPT)、可视化(Davinci)等离线处理核心技能及生态体系,带你打通硬核技能,拓宽上升通道。 首先,我们先来认识spark: 1、什么是spark Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。 2、spark有什么用?  大数据处理和分析:Spark提供了高性能和可扩展的分布式计算能力,可以处理大规模的数据集。它支持批处理、实时流处理和交互式查询等多种数据处理模式,使得开发人员能够高效地处理和分析大数据。   数据转换和清洗:Spark提供丰富的数据处理操作和函数,如映射、过滤、聚合、排序等,使开发人员能够方便地对数据进行转换、清洗和预处理,以满足特定的数据需求。   机器学习和数据挖掘:Spark提供了机器学习库(如MLlib)和图计算库(如GraphX),支持在大规模数据集上进行机器学习和数据挖掘。开发人员可以使用Spark进行特征提取、模型训练和预测等任务。   实时流处理:Spark提供了Spark Streaming模块,支持实时数据流的处理和分析。开发人员可以使用Spark Streaming来处理实时数据流,如日志流、传感器数据流等,并进行实时计算、聚合和窗口操作等。 3、spark的使用场景有哪些 批处理:Spark可以处理大规模的数据集,并提供了丰富的数据处理和转换功能,适用于各种批处理任务,如数据清洗、ETL、数据分析等。 实时流处理:Spark的流处理模块Spark Streaming可以实时处理数据流,并提供了低延迟的处理能力,适用于实时推荐、实时分析、日志处理等应用场景。 机器学习:Spark提供了机器学习库MLlib,包括各种常用的机器学习算法和工具,可以在大规模数据上进行机器学习任务,如分类、回归、聚类、推荐等。 图计算:Spark的图计算库GraphX可以处理大规模图结构数据,并提供了各种图算法和操作,适用于社交网络分析、网络图谱等应用。 SQL查询:Spark支持使用SQL进行数据查询和分析,可以直接在Spark上运行SQL查询,与传统的关系型数据库类似,适用于数据分析和报表生成等任务。 分布式文件系统:Spark可以与分布式文件系统(如HDFS)集成,可以直接读取和处理分布式文件系统中的数据,适用于大规模数据集的处理和分析。 总的来说,Spark适用于大规模数据的处理和分析,支持多种类型的数据处理和计算任务,包括批处理、实时流处理、机器学习、图计算等领域。 4、Spark常用代码 ①创建RDD方法 有两个不同的方式可以创建新的RDD from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("createWholeTextFile").setMaster("local[*]") sc = SparkContext(conf=conf) file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100") print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区 # 用于读取小文件并自动压缩分区 wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100") print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到2个分区 result = wholefile_rdd.take(1) # print(result) # (location, value)的形式 # 获取前面的路径 path_list = wholefile_rdd.map(lambda x: x[0]).collect() sc.stop() ②专门读取小文件wholeTextFiles from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("createWholeTextFile").setMaster("local[*]") sc = SparkContext(conf=conf) file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100") print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区 # 用于读取小文件并自动压缩分区 wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100") print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到2个分区 result = wholefile_rdd.take(1) # print(result) # (location, value)的形式 # 获取前面的路径 path_list = wholefile_rdd.map(lambda x: x[0]).collect() sc.stop() ③rdd的分区数 from pyspark import SparkConf, SparkContext if __name__ == '__main__': # spark入口申请资源 conf = SparkConf().setAppName("createRDD").setMaster("local[5]") # 应该充分使用资源,线程数设置成CPU核心数的2-3倍 # conf.set("spark.default.parallelism", 10) sc = SparkContext(conf=conf) # 创建rdd的第一种方法 collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6]) print(collection_rdd.collect()) # 获取分区数 print("rdd number of partitions ", collection_rdd.getNumPartitions()) # 解释: # 设置了5个核心,默认是5个分区,如果是local[*] 默认是2个分区 # conf.set("spark.default.parallelism", 10)优先使用此值 # 如果sc.parallelize也设置了分区,那么最优先使用api设置的分区数 # 如果是读取文件夹下面的文件,sc.textFile, minPartitions失效,有多少个文件就有多少个分区,下面100个文件返回了100个分区 file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100", minPartitions=3) print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区 # 用于读取小文件并自动压缩分区,minPartitions参数是生效的。 wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100", minPartitions=3) print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到3个分区 # 打印不同分区数据 collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6], numSlices=7) print("collection_rdd number of partitions ", collection_rdd.getNumPartitions()) # 6个数据7个分区,有一个分区是空的 per partition content [[], [1], [2], [3], [4], [5], [6]] print("per partition content", collection_rdd.glom().collect()) # 关闭spark context sc.stop()

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

327 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传