Je préfère utiliser le dataframe Spark natif, car cela me permet plus de personnalisation. Je peux utiliser stringtype propriété pour convertir le champ json de la trame de données en champ jsonb dans la table. Dans ce cas, mon dataframe a deux champs.
from pyspark import SparkConf
sc = SparkContext.getOrCreate(SparkConf())
spark = SparkSession(sc)
df = spark.read.format('csv') \
.option('delimiter','|') \
.option('header','True') \
.load('your_path')
##some transformation...
url = 'jdbc:postgresql://your_host:5432/your_databasename'
properties = {'user':'*****',
'password':'*****',
'driver': "org.postgresql.Driver",
'stringtype':"unspecified"}
df.write.jdbc(url=url, table='your_tablename', mode='append', properties=properties)
Avant d'exécuter le script ci-dessus, vous devez créer la table dans postgresql, car la propriété mode est défini comme append . Ceci comme suit :
create table your_tablename
(
my_json_field jsonb,
another_field int
)