MongoDB
 sql >> Base de données >  >> NoSQL >> MongoDB

MongoDBObject n'est pas ajouté à l'intérieur d'une boucle rrd foreach casbah scala apache spark

Les calculs sur les RDD sont répartis sur le cluster. Vous ne pouvez pas mettre à jour une variable qui a été créée en dehors de la fermeture de l'opération RDD depuis le RDD. Ils se trouvent essentiellement à deux endroits différents :la variable est créée dans le pilote Spark et accessible dans les nœuds de calcul et doit être traitée en lecture seule.

Spark prend en charge les cumulateurs distribués qui pourraient être utilisés dans ce cas :Spark Cummulators

Une autre option (celle que je préférerais) consiste à transformer le flux de RDD dans le format de données souhaité et à utiliser le foreachRDD méthode pour le conserver dans le stockage secondaire. Ce serait une façon plus fonctionnelle d'aborder le problème. Cela ressemblerait à peu près à ceci :

  val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
  val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
  filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)