一、Linux Shell 巩固复习昨天的 grep / find / wc / du 四个大数据高频命令写一个小脚本统计当前目录下所有 .py 文件总行数#!/bin/bashtotal0# 添加总计变量forfilein*.py;doif[-f$file];thenlines$(wc-l$file)echo文件:$file行数:$linestotal$((totallines))# 累加行数fidoneecho------------------------echo总行数:$total二、SQL 高强度训练第二高的薪水去最大取最大selectmax(salary)asSecondHighestSalaryfromemployeewheresalary(selectmax(salary)fromemployee)第 N 高的薪水CREATEFUNCTIONgetNthHighestSalary(NINT)RETURNSINTBEGINsetnn-1;RETURN(selectdistinctsalaryfromemployeeorderbysalarydesclimitn,1);ENDDISTINCT去除重复的薪水ORDER BY salary DESC按薪水降序排列最高在前LIMIT offset, count: offset跳过的记录数即 n n-1: count取 1 条记录分数排名窗口函数用法窗口函数()OVER(PARTITIONBY分组字段-- 可选按什么分组ORDERBY排序字段-- 必需按什么排序)AS别名rank()跳号1,2,2,4并列占位下一个跳号dense_rank()不跳号1,2,2,3并列不占位连续排名row_number()唯一连续1,2,3,4不重复即使分数相同也分先后selectscore,dense_rank()over(orderbyscoredesc)asrankfromscoresRANK 是 MySQL 的保留关键字不能直接作为列别名除非加反引号连续出现的数字WITHconsecutive_checkAS(SELECTnum,LAG(num,1)OVER(ORDERBYid)ASprev_num,LAG(num,2)OVER(ORDERBYid)ASprev_prev_num,LEAD(num,1)OVER(ORDERBYid)ASnext_num,LEAD(num,2)OVER(ORDERBYid)ASnext_next_numFROMLogs)SELECTDISTINCTnumASConsecutiveNumsFROMconsecutive_checkWHERE(numprev_numANDnumprev_prev_num)-- 当前与前两个相同OR(numprev_numANDnumnext_num)-- 当前与前一个、后一个相同OR(numnext_numANDnumnext_next_num);-- 当前与后两个相同三、PySpark 入门第一个真正的程序创建 SparkSession读取一个本地 CSV自己随便造 5 行数据用 Spark SQL 做一次查询筛选、计数、分组打印结果frompyspark.sqlimportSparkSessionimportpandasaspd# 先创建测试CSV文件5行数据test_datapd.DataFrame({name:[张三,李四,王五,赵六,小明],age:[25,18,30,22,19],city:[北京,上海,北京,广州,上海],salary:[8000,5000,12000,7000,6000]})test_data.to_csv(test.csv,indexFalse,encodingutf-8)# 1. 创建SparkSessionsparkSparkSession.builder \.appName(Day3_Test)\.master(local[*])\.getOrCreate()# 2. 读取CSVdfspark.read.csv(test.csv,headerTrue,inferSchemaTrue)# 3. 查看数据print(原始数据)df.show()df.printSchema()# 4. 使用Spark SQL进行查询# 方式1使用DataFrame APIprint(\n方式1 - DataFrame API筛选年龄20按城市分组计数)result1df.filter(age 20).groupBy(city).count()result1.show()# 方式2注册临时视图使用纯SQLdf.createOrReplaceTempView(people)print(\n方式2 - 纯SQL语法筛选年龄20按城市分组计数)result2spark.sql( SELECT city, COUNT(*) as count FROM people WHERE age 20 GROUP BY city ORDER BY count DESC )result2.show()# 额外示例查询平均薪资print(\n额外查询各城市平均薪资年龄20)result3spark.sql( SELECT city, AVG(salary) as avg_salary FROM people WHERE age 20 GROUP BY city )result3.show()# 停止SparkSessionspark.stop()SparkSession 是什么在 PySpark 中SparkSession 是一个统一的入口点用于与 Spark 进行交互。它是 Spark 2.0 引入的新特性替代了旧版中的 SparkContext、SQLContext 和 HiveContext 等。SparkSession 为用户提供了一个单一的接口方便使用 Spark 的各种功能包括 DataFrame、SQL、Streaming 等。从代码示例可以看出创建 SparkSession 对象的方式通常如下frompyspark.sqlimportSparkSession sparkSparkSession.builder \.appName(Python Spark SQL basic example)\.config(spark.some.config.option,some-value)\.getOrCreate()在上述代码中SparkSession.builder 用于构建 SparkSession 对象appName 方法用于设置应用程序的名称config 方法用于设置 Spark 的配置选项getOrCreate 方法用于获取已存在的 SparkSession 对象如果不存在则创建一个新的对象。此外在使用 SparkSession 时可以利用它读取文件如读取 CSV 文件df_csvss.read.csv(path/test/stu.csv,sep,,inferSchemaTrue,schemaname string,age int,gender string,phone string,email string,city string,address string)这里的 ss 是 SparkSession 对象通过 read.csv 方法读取 CSV 文件并创建 DataFrame。DataFrame 是什么在 PySpark 中DataFrame 是组织成命名列named columns的分布式数据集合。它在概念上等同于关系数据库中的表或 R/Python 中的数据框但在幕后做了更丰富的优化。DataFrames 可以从多种来源构建例如结构化数据文件、Hive 中的表、外部数据库或现有 RDD。以下是一个简单的示例展示如何使用 SparkSession 读取 CSV 文件并创建 DataFramefrompyspark.sqlimportSparkSession# 创建 SparkSessionsparkSparkSession.builder \.appName(Python Spark SQL basic example)\.getOrCreate()# 读取 CSV 文件fileDFspark.read.csv(hdfs://tmp/ratings.csv,sep,,headerTrue)# 显示 DataFrame 内容fileDF.show()# 获取前 2 行print(fileDF.head(2))Spark SQL 和普通 SQL 几乎一样相同点在标准 SQL 语法的基础功能上Spark SQL 和普通 SQL 是一致的都支持基础的 CRUD创建、读取、更新、删除和聚合操作等不同点主要体现在以下几个方面设计定位普通 SQL 是传统关系型数据库的标准查询语言用于 OLTP事务处理和简单 OLAP分析处理语法标准化但功能受限缺乏分布式计算能力而 Spark SQL 是基于 Spark 引擎的分布式 SQL 实现扩展了标准 SQL 语法支持批处理和微批流处理强调与 Spark 生态如 MLlib、GraphX的集成。处理模型普通 SQL 采用单机/主从架构执行模式下延迟为毫秒级OLTP 场景仅处理时间Spark SQL 采用分布式微批流或批处理延迟为秒级微批以处理时间为主。语法与功能扩展普通 SQL 的窗口函数如 OVER 子句需特定数据库支持如 Oracle、PostgreSQLSpark SQL 扩展了 UDF用户定义函数、UDAF用户定义聚合函数、窗口函数如 TUMBLE还支持 DataFrame API 和 Catalyst 优化器。示例代码以下是一个简单的普通 SQL 查询示例和对应的 Spark SQL 查询示例-- 普通 SQL 查询示例假设使用 MySQLSELECTCOUNT(*)FROMusersWHEREage18;# Spark SQL 查询示例frompyspark.sqlimportSparkSession sparkSparkSession.builder.appName(SparkSQLExample).getOrCreate()data[(Alice,25),(Bob,20),(Charlie,17)]columns[name,age]dfspark.createDataFrame(data,columns)df.createOrReplaceTempView(users)resultspark.sql(SELECT COUNT(*) FROM users WHERE age 18)result.show()四、算法题合并两个有序链表# Definition for singly-linked list.classListNode:def__init__(self,val0,nextNone):self.valval self.nextnextclassSolution:defmergeTwoLists(self,l1,l2):# 创建虚拟头节点dummyListNode(0)currentdummy# 遍历两个链表whilel1andl2:ifl1.vall2.val:current.nextl1 l1l1.nextelse:current.nextl2 l2l2.nextcurrentcurrent.next# 连接剩余部分ifl1:current.nextl1ifl2:current.nextl2returndummy.next