Selon l'erreur, vous avez déjà une chaîne, (vous avez déjà fait df.selectExpr("CAST(value AS STRING)")
), vous devriez donc essayer d'obtenir l'événement Row en tant que String
, et non un Array[Byte]
Commencez par changer
val valueStr = new String(record.getAs[Array[Byte]]("value"))
à
val valueStr = record.getAs[String]("value")
Je comprends que vous avez peut-être déjà un cluster pour exécuter le code Spark, mais je suggérerais toujours de regarder dans le Connecteur d'évier Kafka Connect Mongo afin que vous n'ayez pas à écrire et à maintenir votre propre écrivain Mongo dans le code Spark.
Ou, vous pouvez également écrire directement des ensembles de données Spark sur mongo