Responsive Ad Area

Share This Post

test

JSON reader expected token type ‘LEFT_PAREN’ but found ‘,’ while running Spark Structured streaming job to MongoDB sink

I am running Structured streaming job to MongoDB sink and i encounter error;

org.bson.json.JsonParseException: JSON reader expected token type ‘LEFT_PAREN’ but found ‘,’

I have created MongoDBForeachWriter.scala, Helpers.scala, StructuredStreamingProgram.scala i compiled it into a jar and i run the spark-submit command

package example
import org.apache.spark.sql.functions.{col, _}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.LongAccumulator
import example.Helpers._
import java.util.Calendar

object StructuredStreamingProgram {

  def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder
  .appName("StructuredStreaming")
  .getOrCreate()

    import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "ausilkaf501.us.dell.com:9092")
      .option("subscribe", "testing_streaming")
      .load()
    val dfs = df.selectExpr("CAST(value AS STRING) as json").toDF()

    // sends to MongoDB once every 20 seconds
    val mongodb_uri = "mongodb://hanzomdbuser:hanzomdbpswd@dstk8sdev06.us.dell.com"
    val mdb_name = "HANZO_MDB"
    val mdb_collection = "testing_streaming"
    val CountAccum: LongAccumulator = spark.sparkContext.longAccumulator("mongostreamcount")

    val structuredStreamForeachWriter: MongoDBForeachWriter = new MongoDBForeachWriter(mongodb_uri,mdb_name,mdb_collection,CountAccum)
    val query = df.writeStream
      .foreach(structuredStreamForeachWriter)
      .trigger(Trigger.ProcessingTime("20 seconds"))
      .start()

    while (!spark.streams.awaitAnyTermination(60000)) {
      println(Calendar.getInstance().getTime()+" :: EventsCount = "+CountAccum.value)
    }

  }
}

package example
import java.util.Calendar
import org.apache.spark.util.LongAccumulator
import org.apache.spark.sql.Row
import org.apache.spark.sql.ForeachWriter
import org.mongodb.scala._
import org.mongodb.scala.bson.collection.mutable.Document
import org.mongodb.scala.bson._
import example.Helpers._
import scala.util.Try


class MongoDBForeachWriter(p_uri: String, p_dbName: String, p_collectionName: String, p_messageCountAccum: LongAccumulator) extends ForeachWriter[Row] {

  val mongodbURI = p_uri
  val dbName = p_dbName
  val collectionName = p_collectionName
  val messageCountAccum = p_messageCountAccum


  var mongoClient: MongoClient = null
  var db: MongoDatabase = null
  var collection: MongoCollection[Document] = null

  def ensureMongoDBConnection(): Unit = {
    if (mongoClient == null) {
      mongoClient = MongoClient(mongodbURI)
      db = mongoClient.getDatabase(dbName)
      collection = db.getCollection(collectionName)
    }
  }

  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row): Unit = {
    val valueStr = new String(record.getAs[Array[Byte]]("value"))

    val doc = Document(valueStr)
    doc += ("log_time" -> Calendar.getInstance().getTime())

    // lazy opening of MongoDB connection
    ensureMongoDBConnection()
    val result = collection.insertOne(doc).results()

    // tracks how many records I have processed
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

  override def close(errorOrNull: Throwable): Unit = {
    if(mongoClient != null) {
      Try {
        mongoClient.close()
      }
    }
  }
}

package example

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import org.mongodb.scala._

object Helpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: Document => String = doc => doc.toJson
  }

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: C => String = doc => doc.toString
  }

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: C => String

    def results(): Seq[C] = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
    def headResult(): C = Await.result(observable.head(), Duration(10, TimeUnit.SECONDS))
    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    }
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  }

}


JSON reader expected token type ‘LEFT_PAREN’ but found ‘,’ while running Spark Structured streaming job to MongoDB sink
JSON reader expected token type ‘LEFT_PAREN’ but found ‘,’ while running Spark Structured streaming job to MongoDB sink
test
{$excerpt:n}

Share This Post

Leave a Reply

Your email address will not be Publishedd. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

Skip to toolbar