Spark大数据-输入源之套接字流
Spark大数据-输入源之套接字流套接字流使用套接字流作为spark streaming数据源1.创建客户端文件目录cd /usr/local/spark/mycodemkdir streamingcd streamingmkdir -p src/main/scala //如果已经存在该目录,则不用创建cd src/main/scalavim NetworkWordCoun...
·
Spark大数据-输入源之套接字流
套接字流
使用套接字流作为spark streaming数据源
- 1.创建客户端文件目录
cd /usr/local/spark/mycode
mkdir streaming
cd streaming
mkdir -p src/main/scala //如果已经存在该目录,则不用创建
cd src/main/scala
vim NetworkWordCount.scala
- 2.编写客户端程序
// Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
// 对日志进行格式设置。
// 对日志信息的过滤,还可以修改spark/conf中的log4j.properties.template中的log4j.rootCategory=INFO,console为
// log4j.rootCategory=WARN,console
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
//定义输入数据源,主机,端口号,保存数据方式
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
// hostname port
NetworkWordCount.main(Array("localhost","9999"))
- 3.打包编译运行(类似于上一篇博文:文件流作为输入源)
- 4.启动程序
cd /usr/local/spark/mycode/streaming
/usr/local/spark/bin/spark-submit --class "NetworkWordCount" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar localhost 9999
- 5.tcp服务端:netcat;-lk:-l启动监听,k连续监听;给客户端发数据
nc -lk 9999
手动编写产生socket服务端:
cd cd /usr/local/spark/mycode/streaming/src/main/scala
vim DataSourceSocket.scala
// DataSourceSocket.scala
// 随机读取文本中的某一行,产生套接字数据源
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object DataSourceSocket {
def index(length: Int) = {
val rdm = new java.util.Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("Usage: <filename> <port> <millisecond>")
System.exit(1)
}
// 读取那个文件
val fileName = args(0)
val lines = Source.fromFile(fileName).getLines.toList
val rowCount = lines.length
// 创建套接字监听对象
val listener = new ServerSocket(args(1).toInt)
// 循环随机选取之前文件中的一行
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(2).toLong)
val content = lines(index(rowCount))
println(content)
out.write(content + '\n')
out.flush()
}
socket.close()
}
}.start()
}
}
}
DataSourceSocket.main(Array("/home/chenbengang/ziyu_bigdata/quick_learn_spark/logfile/log3.txt","9999","1000"))
编译打包后启动DataSourceSocket程序:
/usr/local/spark/bin/spark-submit --class "DataSourceSocket" /usr/local/spark/mycode/streaming/target/scala-2.11/simple-project_2.11-1.0.jar /home/chenbengang/ziyu_bigdata/quick_learn_spark/logfile/log3.txt 9999 1000
更多推荐
所有评论(0)