MongoDB
 sql >> Base de données >  >> NoSQL >> MongoDB

Couler Kafka Stream vers MongoDB à l'aide de PySpark Structured Streaming

J'ai trouvé une solution.Comme je ne pouvais pas trouver le bon pilote Mongo pour le streaming structuré, j'ai travaillé sur une autre solution.Maintenant, j'utilise la connexion directe à mongoDb et j'utilise "foreach(...)" au lieu de foreachbatch(. ..). Mon code ressemble à ceci dans le fichier testSpark.py :

....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()