J'ai résolu mon problème. La raison des décomptes incohérents était le MongoDefaultPartitioner qui enveloppe MongoSamplePartitioner qui utilise un échantillonnage aléatoire. Pour être honnête, c'est un défaut assez étrange pour moi. Personnellement, je préférerais avoir un partitionneur lent mais cohérent à la place. Les détails des options de partitionnement peuvent être trouvés dans les options de configuration officielles documents.
code :
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()