
大数据分析学习第十三课 实战使用Idea创建java程序运行spark的wordcount
开发工具:Intellij IDEAJava版本:JDK1.8以上在pom.xml文件中添加我们需要的jar包:spark-core<!--定义spark版本--><properties><spark.version>2.2.3</spark.version></properties><!--spark-core核心包-->&
一 开发环境
开发工具:Intellij IDEA2021
Java版本:JDK1.8以上
Scala版本:2.12.14
Spark版本:3.1.2
Hadoop版本:3.2.0
二 Idea新建一个Maven项目
配置pom.xml文件,添加我们需要的jar包:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.14</scala.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>3.2.0</hadoop.version>
<hive.version>3.1.2</hive.version>
<spark.version>3.1.2</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>com.thoughtworks.paranamer</groupId>
<artifactId>paranamer</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<!-- scala语言核心包 -->
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<exclusions>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!-- 由于我们的程序可能有很多,所以这里可以不用指定main方法所在的类名,我们可以在提交spark程序的时候手动指定要调用那个main方法 -->
<!--
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.xuebusi.spark.WordCount</mainClass>
</transformer>
</transformers>
-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
三 编码
右击项目文件夹scala,新建scala class,选择object
编写以下代码
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]) {
//创建SparkConf
val conf: SparkConf = new SparkConf()
//创建SparkContext
val sc: SparkContext = new SparkContext(conf)
//从文件读取数据
val lines: RDD[String] = sc.textFile(args(0))
//按空格切分单词
val words: RDD[String] = lines.flatMap(_.split(" "))
//单词计数,每个单词每出现一次就计数为1
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//聚合,统计每个单词总共出现的次数
val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
//排序,根据单词出现的次数排序
val fianlResult: RDD[(String, Int)] = result.sortBy(_._2, false)
//将统计结果保存到文件
fianlResult.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
注意我们的main方法需要传入2个参数,args(0)是需要分析的HDFS文件路径
args(1)是分析结果存储的HFDS路径
我们执行打包,双击右侧Maven面板的package,结果报下面错误
Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
造成以上错误的原因是
mvn cleanpackage默认只处理java源代码的编译、打包,而不管scala,所以编译时遇到Wordcount这个由scala语言编写的class,此时scala还没编译生成class,所以找不到相应的调用入口。
解决办法,我们在命令行执行,优先编译scala
mvn clean scala:compile scala:testCompile compile -X package
可以看结果,打包成功
我们看项目目录,已经生成了jar文件
四 提交集群
我们把打包的jar文件 SparkWordCount-1.0-SNAPSHOT.jar 上传到linux服务器
启动HDFS,Spark master和worker
HDFS
Spark
我们对/word0326.txt进行单词计数,我们cd到spark目录,提交以下命令
bin/spark-submit --class WordCount --executor-memory 512m --total-executor-cores 1 /root/software/sparktest/SparkWordCount-1.0-SNAPSHOT.jar hdfs://master104:9000/word0326.txt hdfs://master104:9000/output
注意最后加粗的是我们jar包的2个参数
可以看到执行成功了,我们去HDFS管理界面 http://master104:50070/explorer.html#/ 查看下结果
2个part开头的文件存储的就是我们的结果,我们在终端查看下结果,因为有2个文件,我们直接用下面命令查看所有文件
hdfs dfs -cat /output/part-*
结果如下
总结
感谢能看到这里的朋友😉
本次的分享就到这里,猫头鹰数据致力于为大家分享技术干货😎
如果以上过程中出现了任何的纰漏错误,烦请大佬们指正😅
受益的朋友或对技术感兴趣的伙伴记得点赞关注支持一波🙏
也可以搜索关注我的微信公众号【猫头鹰数据分析】,留言交流🙏
更多推荐
所有评论(0)