MongoDB
 sql >> Base de données >  >> NoSQL >> MongoDB

Comment résoudre com.mongodb.spark.exceptions.MongoTypeConversionException :Impossible de lancer... Java Spark

J'ai eu le même problème et sampleSize résout partiellement ce problème, mais ne le résout pas si vous avez beaucoup de données.

Voici la solution pour résoudre ce problème. Utilisez cette approche avec une augmentation de sampleSize (dans mon cas c'est 100000):

def fix_schema(schema: StructType) -> StructType:
    """Fix spark schema due to inconsistent MongoDB schema collection.

    It fixes such issues like:
        Cannot cast STRING into a NullType
        Cannot cast STRING into a StructType

    :param schema: a source schema taken from a Spark DataFrame to be fixed
    """
    if isinstance(schema, StructType):
        return StructType([fix_schema(field) for field in schema.fields])
    if isinstance(schema, ArrayType):
        return ArrayType(fix_schema(schema.elementType))
    if isinstance(schema, StructField) and is_struct_oid_obj(schema):
        return StructField(name=schema.name, dataType=StringType(), nullable=schema.nullable)
    elif isinstance(schema, StructField):
        return StructField(schema.name, fix_schema(schema.dataType), schema.nullable)
    if isinstance(schema, NullType):
        return StringType()
    return schema


def is_struct_oid_obj(struct_field: StructField) -> bool:
    """
    Checks that our schema has StructType field with single oid name inside

    :param struct_field: a StructField from Spark schema
    :return bool
    """
    return (isinstance(struct_field.dataType, StructType)
            and len(struct_field.dataType.fields) == 1
            and struct_field.dataType.fields[0].name == "oid")