Spark Streaming + ELK
I managed the logs generated during real-time event processing with spark streaming in my very first Telecom job. As always, the records’ size proved to be a big data challenge, so I used a well-known solution, ELK.
The task was divided into smaller parts. First, I had to capture useful information from query processing in the spark streaming mode while using the Scala language. This little bit of code in scala saved my time :).
class EventCollector extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println("Start")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(event.queryStatus.prettyJson)
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println("Term")
}
As you can see, I returned important information via JSON using this line of code:
queryProgress.progress.json
results should look like this:
This JSON could be saved as a file or sent to a document-based database like MongoDB, but I decided to use more sophisticated tools to manage logs, so I used ELK to store and analyze records.
This code enabled me to write my data into Elasticsearch with proper authentication.
df.write.format("org.elasticsearch.spark.sql")
.option("es.port",your port)
.option("es.nodes",your address)
.option("es.net.http.auth.user",elastic username) .option("es.net.http.auth.pass",elastic password)
.mode("")
.save("your collection name"))
It is vital to find an appropriate package for your projects based on your Big data stack versions. You can find it throw the Marven repository.(repository)