小工具      在线工具  汉语词典  css  js  c++  java

Spark - COVID-19 案例实践使用 Scala、Java、Python 三种语言演示

大数据处理,spark,scala,java 额外说明

收录于:196天前

一、COVID-19 案例

在前面的文章中,有使用 MapReduce 实践 COVID-19 新冠肺炎案例,本篇基于 Spark 对该数据集进行分析,下面是 MapReduce 案例的文章地址:

https://blog.csdn.net/qq_43692950/article/details/127475811

数据格式如下所示:

date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27

数据集下载:

https://download.csdn.net/download/qq_43692950/86805389

二、计算各个州的累积cases、deaths

思路:

  1. 读取数据集
  2. 转换键值对,key:州,value:各个县的 cases、deaths
  3. 根据 key 聚合,相加 cases、deaths
  4. 根据州排序输出
  • Scala:
object CovidSum {
    

  case class CovidVO(cases: Long, deaths: Long) extends java.io.Serializable

  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val t1 = sc.textFile("D:/test/input/")
    t1.filter(StringUtils.isNotBlank)
      .map(s => {
    
        val line = s.split(",")
        if (line.size >= 6) {
    
          val state = line(2)
          val cases = line(4).toLong
          val deaths = line(5).toLong
          (state, CovidVO(cases, deaths))
        } else {
    
          null
        }
      }).filter(Objects.nonNull)
      .reduceByKey((v1, v2) => CovidVO(v1.cases + v2.cases, v1.deaths + v2.deaths))
      .sortByKey(ascending = true, 1)
      .foreach(t => {
    
        println(t._1 + " " + t._2.cases + " " + t._2.deaths)
      })
  }
}
  • Java:
public class CvoidSum {
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class CovidVO implements Serializable {
    
        private Long cases;
        private Long deaths;
    }

    public static void main(String[] args) {
    
        SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaRDD<String> t1 = sc.textFile("D:/test/input/");
        t1.filter(StringUtils::isNotBlank)
                .mapToPair(s -> {
    
                    List<String> line = Arrays.asList(s.split(","));
                    if (line.size() >= 6) {
    
                        String state = line.get(2);
                        Long cases = Long.parseLong(line.get(4));
                        Long deaths = Long.parseLong(line.get(5));
                        return new Tuple2<>(state, new CovidVO(cases, deaths));
                    }
                    return null;
                }).filter(Objects::nonNull)
                .reduceByKey((v1, v2) -> new CovidVO(v1.getCases() + v2.getCases(), v1.getDeaths() + v2.getDeaths()))
                .sortByKey(true, 1)
                .foreach(t -> {
    
                    System.out.println(t._1 + " " + t._2.getCases() + " " + t._2.getDeaths());
                });
    }
}
  • Python:
from pyspark import SparkConf, SparkContext
import findspark

if __name__ == '__main__':
    findspark.init()
    conf = SparkConf().setAppName('spark').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")

    t1 = sc.textFile("D:/test/input/")

    def mapT(s):
        line = s.split(",")
        if (len(line) >= 6):
            state = line[2]
            casesStr = line[4]
            deathsStr = line[5]
            cases = int(casesStr if (casesStr and casesStr != '') else '0')
            deaths = int(deathsStr if (deathsStr and deathsStr != '') else '0')
            return (state, (cases, deaths))
        else:
            return None

    t1.filter(lambda s: s and s != '') \
        .map(mapT) \
        .filter(lambda s: s != None) \
        .reduceByKey(lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1])) \
        .sortByKey(ascending=True, numPartitions=1) \
        .foreach(lambda t: print(t[0] + " " + str(t[1][0]) + " " + str(t[1][1])))

统计结果:

在这里插入图片描述

三、计算各个州的累积cases、deaths,并根据 deaths 降序排列

思路:

  1. 读取数据集
  2. 转换键值对,key:州,value:各个县的 cases、deaths
  3. 根据 key 聚合,相加 cases、deaths
  4. 根据 value 中的 deaths 降序排列输出
  • Scala:
object CovidSumSortDeaths {
    
  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val t1 = sc.textFile("D:/test/input/")
    t1.filter(StringUtils.isNotBlank)
      .map(s => {
    
        val line = s.split(",")
        if (line.size >= 6) {
    
          val state = line(2)
          val cases = line(4).toLong
          val deaths = line(5).toLong
          (state, CovidVO(cases, deaths))
        } else {
    
          null
        }
      }).filter(Objects.nonNull)
      .reduceByKey((v1, v2) => CovidVO(v1.cases + v2.cases, v1.deaths + v2.deaths))
      .sortBy(_._2.deaths, ascending = false, 1)
      .foreach(t => {
    
        println(t._1 + " " + t._2.cases + " " + t._2.deaths)
      })
  }
}
  • Java:
public class CvoidSumSortDeaths {
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class CovidVO implements Serializable {
    
        private Long cases;
        private Long deaths;
    }

    public static void main(String[] args) {
    
        SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaRDD<String> t1 = sc.textFile("D:/test/input/");
        t1.filter(StringUtils::isNotBlank)
                .mapToPair(s -> {
    
                    List<String> line = Arrays.asList(s.split(","));
                    if (line.size() >= 6) {
    
                        String state = line.get(2);
                        Long cases = Long.parseLong(line.get(4));
                        Long deaths = Long.parseLong(line.get(5));
                        return new Tuple2<>(state, new CovidVO(cases, deaths));
                    }
                    return null;
                }).filter(Objects::nonNull)
                .reduceByKey((v1, v2) -> new CovidVO(v1.getCases() + v2.getCases(), v1.getDeaths() + v2.getDeaths()))
                .map(t -> new Tuple2<String, CovidVO>(t._1, t._2))
                .sortBy(t -> t._2.getDeaths(), false, 1)
                .foreach(t -> {
    
                    System.out.println(t._1 + " " + t._2.getCases() + " " + t._2.getDeaths());
                });
    }
}
  • Python:
from pyspark import SparkConf, SparkContext
import findspark

if __name__ == '__main__':
    findspark.init()
    conf = SparkConf().setAppName('spark').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")

    t1 = sc.textFile("D:/test/input/")

    def mapT(s):
        line = s.split(",")
        if (len(line) >= 6):
            state = line[2]
            casesStr = line[4]
            deathsStr = line[5]
            cases = int(casesStr if (casesStr and casesStr != '') else '0')
            deaths = int(deathsStr if (deathsStr and deathsStr != '') else '0')
            return (state, (cases, deaths))
        else:
            return None

    t1.filter(lambda s: s and s != '') \
        .map(mapT) \
        .filter(lambda s: s != None) \
        .reduceByKey(lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1])) \
        .sortBy(lambda t: t[1][1], ascending=False, numPartitions=1) \
        .foreach(lambda t: print(t[0] + " " + str(t[1][0]) + " " + str(t[1][1])))

统计结果:

在这里插入图片描述

四、计算各个州根据 deaths 排列Top3的县

思路:

  1. 读取数据集
  2. 转换键值对,key:州,value:各个 县 以及cases、deaths
  3. 根据 key 聚合,相加 cases、deaths
  4. 声明一个长度为 3 的数组,来实时计算存储 top3 的值,从而减少全量排序的 Shuffle 占用内存过大问题
  5. 先计算出每个分区 top3 的值
  6. 对各个分区 top3 的值进行比较求取最终 top3 的值
  7. 根据州排序输出
  • Scala:
object CovidTopDeaths {
    

  case class CovidVO(county: String, cases: Long, deaths: Long) extends java.io.Serializable

  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val t1 = sc.textFile("D:/test/input/")
    t1.filter(StringUtils.isNotBlank)
      .map(s => {
    
        val line = s.split(",")
        if (line.size >= 6) {
    
          val state = line(2)
          val county = line(1)
          val cases = line(4).toLong
          val deaths = line(5).toLong
          (state, CovidVO(county, cases, deaths))
        } else {
    
          null
        }
      }).filter(Objects.nonNull)
      .aggregateByKey(new Array[CovidVO](3))(
        (zeroValue: Array[CovidVO], currentValue: CovidVO) => {
    
          comparison(zeroValue, currentValue)
        },
        (topP1: Array[CovidVO], topP2: Array[CovidVO]) => {
    
          var top = topP2
          for (topP1Value <- topP1) {
    
            top = comparison(top, topP1Value)
          }
          top
        }
      ).sortByKey(ascending = true, 1)
      .foreach(t => {
    
        for (topValue <- t._2) {
    
          if (Objects.nonNull(topValue)) println(t._1 + " " + topValue)
        }
      })

  }

  def comparison(topValue: Array[CovidVO], currentValue: CovidVO): Array[CovidVO] = {
    
    var flag = true
    for (i <- topValue.indices if flag) {
    
      if (topValue(i) == null) {
    
        topValue(i) = currentValue
        flag = false
      } else if (currentValue.deaths > topValue(i).deaths) {
    
        for (j <- topValue.length - 1 to i + 1 by -1) {
    
          topValue(j) = topValue(j - 1)
        }
        topValue(i) = currentValue
        flag = false
      }
    }
    topValue
  }

}
  • Java:
public class CvoidTopDeaths {
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class CovidVO implements Serializable {
    
        private String county;
        private Long cases;
        private Long deaths;
    }

    public static void main(String[] args) {
    
        SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaRDD<String> t1 = sc.textFile("D:/test/input/");
        t1.filter(StringUtils::isNotBlank)
                .mapToPair(s -> {
    
                    List<String> line = Arrays.asList(s.split(","));
                    if (line.size() >= 6) {
    
                        String state = line.get(2);
                        String county = line.get(1);
                        Long cases = Long.parseLong(line.get(4));
                        Long deaths = Long.parseLong(line.get(5));
                        return new Tuple2<>(state, new CovidVO(county, cases, deaths));
                    }
                    return null;
                }).filter(Objects::nonNull)
                .aggregateByKey(
                        new CovidVO[3],
                        CvoidTopDeaths::comparison,
                        (topP1, topP2) -> {
    
                            for (CovidVO topP1Value : topP1) {
    
                                topP2 = comparison(topP2, topP1Value);
                            }
                            return topP2;
                        })
                .sortByKey(true, 1)
                .foreach(t -> {
    
                    for (CovidVO topValue : t._2) {
    
                        if (Objects.nonNull(topValue)) {
    
                            System.out.println(t._1 + " " + topValue);
                        }
                    }
                });

    }

    private static CovidVO[] comparison(CovidVO[] topValue, CovidVO currentValue) {
    
        for (int i = 0; i < 3; i++) {
    
            if (topValue[i] == null) {
    
                topValue[i] = currentValue;
                break;
            } else if (currentValue.deaths > topValue[i].deaths) {
    
                for (int j = topValue.length - 1; j > i; j--) {
    
                    topValue[j] = topValue[j - 1];
                }
                topValue[i] = currentValue;
                break;
            }
        }
        return topValue;
    }

}
  • Python:
from pyspark import SparkConf, SparkContext, StorageLevel
import findspark

if __name__ == '__main__':
    findspark.init()
    conf = SparkConf().setAppName('spark').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")

    t1 = sc.textFile("D:/test/input/")


    def mapT(s):
        line = s.split(",")
        if (len(line) >= 6):
            state = line[2]
            county = line[1]
            casesStr = line[4]
            deathsStr = line[5]
            cases = int(casesStr if (casesStr and casesStr != '') else '0')
            deaths = int(deathsStr if (deathsStr and deathsStr != '') else '0')
            return (state, (county, cases, deaths))
        else:
            return None


    def comparison(topValue, currentValue):
        for i in range(0, 3):
            if topValue[i] == None:
                topValue[i] = currentValue
                break
            elif currentValue[2] > topValue[i][2]:
                for j in range(len(topValue) - 1, i, -1):
                    topValue[j] = topValue[j - 1]
                topValue[i] = currentValue
                break
        return topValue


    def comparison2(topP1, topP2):
        for v in topP1:
            topP2 = comparison(topP2, v)
        return topP2


    def printValue(top):
        for t in top[1]:
            if t: print(top[0], t[0], t[1], t[2])


    t1.filter(lambda s: s and s != '') \
        .map(mapT) \
        .filter(lambda s: s != None) \
        .aggregateByKey([None, None, None], comparison, comparison2) \
        .sortByKey(True, 1) \
        .foreach(printValue)

统计结果:

在这里插入图片描述

五、计算各个州 cases、deaths 的平均值,并根据 deaths 的平均值降序排列

思路:

  1. 读取数据集
  2. 转换键值对,key:州,value:各个 县 以及cases、deaths
  3. 根据 key 聚合,相加 cases、deaths
  4. 声明初始值,cases、deaths 默认为 0,再声明一个全局计数器默认为 0
  5. 先计算出每个分区 cases、deaths 的总和,以及计数。
  6. 对各个分区 cases、deaths 的总和和计数进行相加
  7. 对相加后 cases、deaths / 计数的总和得到每个 key 的平均值
  8. 根据 value 中的 deaths 降序排列输出
  • Scala:
object CovidAvgSortDeaths {
    

  case class CovidVO(cases: Double, deaths: Double) extends java.io.Serializable

  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val t1 = sc.textFile("D:/test/input/")
    t1.filter(StringUtils.isNotBlank)
      .map(s => {
    
        val line = s.split(",")
        if (line.size >= 6) {
    
          val state = line(2)
          val cases = line(4).toDouble
          val deaths = line(5).toDouble
          (state, CovidVO(cases, deaths))
        } else {
    
          null
        }
      }).filter(Objects.nonNull)
      .aggregateByKey((CovidVO(0D, 0D), 0))(
        (zeroValue, currentValue) => {
    
          (CovidVO(zeroValue._1.cases + currentValue.cases, zeroValue._1.deaths + currentValue.deaths), zeroValue._2 + 1)
        },
        (topP1, topP2) => {
    
          (CovidVO(topP1._1.cases + topP2._1.cases, topP1._1.deaths + topP2._1.deaths), topP1._2 + topP2._2)
        }
      )
      .map(t => (t._1, CovidVO(t._2._1.cases / t._2._2, t._2._1.deaths / t._2._2)))
      .sortBy(t => t._2.deaths, ascending = false, 1)
      .foreach(t => {
    
        println(t._1 + " " + t._2)
      })
  }

}
  • Java
public class CvoidAvgSortDeaths {
    

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class CovidVO implements Serializable {
    
        private Double cases;
        private Double deaths;
    }

    public static void main(String[] args) {
    
        SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaRDD<String> t1 = sc.textFile("D:/test/input/");
        t1.filter(StringUtils::isNotBlank)
                .mapToPair(s -> {
    
                    List<String> line = Arrays.asList(s.split(","));
                    if (line.size() >= 6) {
    
                        String state = line.get(2);
                        Double cases = Double.parseDouble(line.get(4));
                        Double deaths = Double.parseDouble(line.get(5));
                        return new Tuple2<>(state, new CovidVO(cases, deaths));
                    }
                    return null;
                }).filter(Objects::nonNull)
                .aggregateByKey(
                        new Tuple2<CovidVO, Integer>(new CovidVO(0D, 0D), 0),
                        (zeroValue, currentValue) -> {
    
                            CovidVO v = zeroValue._1;
                            v.setCases(v.getCases() + currentValue.getCases());
                            v.setDeaths(v.getDeaths() + currentValue.getDeaths());
                            return new Tuple2<>(v, zeroValue._2 + 1);
                        },
                        (p1, p2) -> {
    
                            CovidVO v1 = p1._1;
                            CovidVO v2 = p2._1;
                            v1.setCases(v1.getCases() + v2.getCases());
                            v1.setDeaths(v1.getDeaths() + v2.getDeaths());
                            return new Tuple2<>(v1, p1._2 + p2._2);
                        })
                .map(t -> {
    
                    CovidVO v = t._2._1;
                    v.setCases(v.getCases() / t._2._2);
                    v.setDeaths(v.getDeaths() / t._2._2);
                    return new Tuple2<String, CovidVO>(t._1, v);
                })
                .sortBy(t -> t._2.deaths, false, 1)
                .foreach(t -> {
    
                    System.out.println(t._1 + " " + t._2);
                });
    }
}
  • Python:
from pyspark import SparkConf, SparkContext, StorageLevel
import findspark

if __name__ == '__main__':
    findspark.init()
    conf = SparkConf().setAppName('spark').setMaster('local[*]')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("WARN")

    t1 = sc.textFile("D:/test/input/")


    def mapT(s):
        line = s.split(",")
        if (len(line) >= 6):
            state = line[2]
            casesStr = line[4]
            deathsStr = line[5]
            cases = float(casesStr if (casesStr and casesStr != '') else '0')
            deaths = float(deathsStr if (deathsStr and deathsStr != '') else '0')
            return (state, (cases, deaths))
        else:
            return None


    t1.filter(lambda s: s and s != '') \
        .map(mapT) \
        .filter(lambda s: s != None) \
        .aggregateByKey((0.0, 0.0, 0), lambda zeroValue, currentValue: (
        zeroValue[0] + currentValue[0], zeroValue[1] + currentValue[1], zeroValue[2] + 1),
                        lambda p1, p2: (p1[0] + p2[0], p1[1] + p2[1], p1[2] + p2[2])) \
        .map(lambda t: (t[0], t[1][0] / t[1][2], t[1][1] / t[1][2])) \
        .sortBy(lambda t: t[2], ascending=False, numPartitions=1) \
        .foreach(lambda t: print(t[0], t[1], t[2]))

统计结果:

在这里插入图片描述

. . .

相关推荐

额外说明

ClickHouse JDBC Bridge应用技术实践

一、ClickHouse JDBC Bridge下载安装RPM包 https://github.com/ClickHouse/clickhouse-jdbc-bridge #github下载地址 rpm -ivh clickhouse-jdbc

额外说明

Docker无法启动:端口冲突导致容器失败

Docker无法启动:端口冲突导致容器失败 Docker无法启动:端口冲突导致容器失败 摘要 引言 正文 什么是端口冲突? 1. 端口 2. 端口冲突 为什么端口冲突会导致容器启动失败? 1. 端口映射 如何解决端口冲突导致的容器启动失败? 1. 检查端

额外说明

突破技术边界,开创“粽“享未来

突破技术边界,开创“粽“享未来 端午节的由来 端午节的习俗 端午祈福 博主 默语带您 Go to New World. ✍ 个人主页—— 默语 的博客-- 《java 面试题大全》 -惟余辈才疏学浅,临摹之作或有不妥之处,还请读者海涵指正。☕- 《MYS

额外说明

【算法系列篇】模拟算法

文章目录 前言 1.替换所有问号 1.1 题目要求 1.2 做题思路 1.3 Java代码实现 2. 提莫攻击 2.1 题目要求 2.2 做题思路 2.3 Java代码实现 3. N 字形变换 3.1 题目要求 3.2 做题思路 3.3 Java代码实现

额外说明

网络基础知识面试题1

VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...)https://blog.csdn.net/chenlycly/article/details/124272585C++软件异常排查从入门到精通系列教程(专栏文章列表,欢迎订阅,持续更新.

额外说明

基于Spring Boot 的 Ext JS 应用框架之coworkee

Ext JS 官方提供了一个人员管理的完整应用框架 - coworkee。该框架的显示如下: 该框架的布局特点如下: 布局方式: 左右布局, 左侧导航栏默认收合 特点:左侧导航区占用空间小, 工作区较大, 适合没有二级导航栏,工作区需要显示的内容较多的系

额外说明

前端(六)——TypeScript在前端中的重要性与应用

-博主:小猫娃来啦 -文章核心:TypeScript在前端中的重要性与应用 文章目录 什么是TypeScript? TypeScript与JavaScript的关系 如何使用TypeScript TypeScript在前端开发中的应用场景 提升开发效率

额外说明

百度文心一言对标 ChatGPT,你怎么看?

文心一言 VS ChatGPT 接受不完美 期待进步 里程碑意义 文心一言初体验 ✔ 文学创作 ✔ 商业文案创作 ✔ 数理逻辑推算 ✔ 中文理解 ✔ 多模态生成 写在最后 何为文心?“文”就是我们中华语言文字中的文,“心”是希望该语言模型可以用心的去理解

额外说明

MySQL【实践 01】Linux 环境 MySQL 数据库备份 shell 脚本(脚本源码及说明+定时任务配置+数据库恢复测试)粘贴可以

数据库备份的重要性不言而喻,备份的方法主要分为两大类,一是文件备份,二是数据库本身的备份机制binlog日志,今天先说说文件备份,就是将数据库【结构和数据】导出为文件。 1.备份脚本 在 /data/backup 下编写备份脚本 mysql_backup

额外说明

SQL中LIKE和REGEXP的简单比较

1. 在SQL、LIKE 和 REGEXP 是两种不同的模式匹配语法。它们用于匹配与数据库查询中的特定模式匹配的字符串。 1.LIKE是SQL中用于模糊匹配的运算符。它使用通配符 % 来表示任意数字字符序列(包括空字符序列)的位置。 例如: SELECT

ads via 小工具