Spark Streaming + ELK

Pooria Tgh
2 min readMar 11, 2021

I managed the logs generated during real-time event processing with spark streaming in my very first Telecom job. As always, the records’ size proved to be a big data challenge, so I used a well-known solution, ELK.

Spark streaming concept

The task was divided into smaller parts. First, I had to capture useful information from query processing in the spark streaming mode while using the Scala language. This little bit of code in scala saved my time :).

Reference on StackOverflow:

class EventCollector extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println("Start")
}

override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(event.queryStatus.prettyJson)
}

override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println("Term")
}

As you can see, I returned important information via JSON using this line of code:

queryProgress.progress.json

results should look like this:

Query Progress JSON format

This JSON could be saved as a file or sent to a document-based database like MongoDB, but I decided to use more sophisticated tools to manage logs, so I used ELK to store and analyze records.

ELK Concept

This code enabled me to write my data into Elasticsearch with proper authentication.

df.write.format("org.elasticsearch.spark.sql")
.option("es.port",your port)
.option("es.nodes",your address)
.option("es.net.http.auth.user",elastic username) .option("es.net.http.auth.pass",elastic password)
.mode("")
.save("your collection name"))

It is vital to find an appropriate package for your projects based on your Big data stack versions. You can find it throw the Marven repository.(repository)

--

--

Pooria Tgh

Hey, the community of mediums :). I’m Pourya, and I am very interested in all kinds of content about Big Data.