Dans Spark, les fonctions sur RDD
s (comme map
ici) sont sérialisés et envoyés aux exécuteurs pour traitement. Cela implique que tous les éléments contenus dans ces opérations doivent être sérialisables.
La connexion Redis ici n'est pas sérialisable car elle ouvre des connexions TCP à la base de données cible qui sont liées à la machine sur laquelle elle est créée.
La solution consiste à créer ces connexions sur les exécuteurs, dans le contexte d'exécution local. Il y a peu de façons de le faire. Deux qui me viennent à l'esprit :
rdd.mapPartitions
:permet de traiter une partition entière d'un coup, et donc d'amortir le coût de création des connexions)- Gestionnaires de connexion singleton :créez la connexion une fois par exécuteur
mapPartitions
est plus facile car il suffit d'un petit changement dans la structure du programme :
val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Un gestionnaire de connexions singleton peut être modélisé avec un objet contenant une référence paresseuse à une connexion (remarque :une référence mutable fonctionnera également).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Cet objet peut ensuite être utilisé pour instancier 1 connexion par JVM worker et est utilisé comme Serializable
objet dans une fermeture d'opération.
val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
L'avantage d'utiliser l'objet singleton est moins de surcharge car les connexions ne sont créées qu'une seule fois par JVM (par opposition à 1 par partition RDD)
Il y a aussi quelques inconvénients :
- le nettoyage des connexions est délicat (hook d'arrêt/minuteries)
- il faut garantir la sécurité des threads des ressources partagées
(*) code fourni à titre indicatif. Non compilé ou testé.