J'étais sur le même problème, je ne sais pas si vous avez trouvé une solution ou non, mais j'ai pu accomplir quelque chose de similaire en procédant comme suit. Tout d'abord, j'ai ajouté un déclencheur à ma table
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Cela définira un déclencheur sur la table chaque fois qu'une ligne est mise à jour, supprimée ou insérée. Ensuite, il appellera la fonction de déclenchement que j'ai configurée et qui ressemblait à ceci :
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Cela me permettra "d'écouter" l'une de ces mises à jour de mon projet de démarrage de printemps et il enverra la ligne entière en tant que charge utile. Ensuite, dans mon projet de démarrage de printemps, j'ai configuré une connexion à ma base de données.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
Avec cela, je l'autowire (injection de dépendance) dans le constructeur de ma classe de service et je le convertis en une classe r2dbc PostgressqlConnection comme suit :
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Maintenant, nous voulons "écouter" notre table et être averti lorsque nous effectuons une mise à jour de notre table. Pour ce faire, nous avons mis en place une méthode d'initialisation qui est effectuée après l'injection de dépendances en utilisant l'annotation @PostContruct
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Notez que nous écoutons le nom que nous mettons dans la méthode pg_notify. Nous souhaitons également mettre en place une méthode pour fermer la connexion lorsque le bean est sur le point d'être jeté, comme ceci :
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Maintenant, je crée simplement une méthode qui renvoie un flux de tout ce qui se trouve actuellement dans ma table, et je le fusionne également avec mes notifications, comme je l'ai dit avant que les notifications n'arrivent sous forme de json, j'ai donc dû la désérialiser et j'ai décidé d'utiliser ObjectMappeur. Ainsi, cela ressemblera à ceci :
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
J'espère que cela vous aidera.Cheers !