PostgreSQL
 sql >> Base de données >  >> RDS >> PostgreSQL

@Tailable(spring-data-reactive-mongodb) équivalent dans spring-data-r2dbc

J'étais sur le même problème, je ne sais pas si vous avez trouvé une solution ou non, mais j'ai pu accomplir quelque chose de similaire en procédant comme suit. Tout d'abord, j'ai ajouté un déclencheur à ma table

CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

Cela définira un déclencheur sur la table chaque fois qu'une ligne est mise à jour, supprimée ou insérée. Ensuite, il appellera la fonction de déclenchement que j'ai configurée et qui ressemblait à ceci :

CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

Cela me permettra "d'écouter" l'une de ces mises à jour de mon projet de démarrage de printemps et il enverra la ligne entière en tant que charge utile. Ensuite, dans mon projet de démarrage de printemps, j'ai configuré une connexion à ma base de données.

@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    }
}

Avec cela, je l'autowire (injection de dépendance) dans le constructeur de ma classe de service et je le convertis en une classe r2dbc PostgressqlConnection comme suit :

this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

Maintenant, nous voulons "écouter" notre table et être averti lorsque nous effectuons une mise à jour de notre table. Pour ce faire, nous avons mis en place une méthode d'initialisation qui est effectuée après l'injection de dépendances en utilisant l'annotation @PostContruct

@PostConstruct
private void postConstruct() {
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}

Notez que nous écoutons le nom que nous mettons dans la méthode pg_notify. Nous souhaitons également mettre en place une méthode pour fermer la connexion lorsque le bean est sur le point d'être jeté, comme ceci :

@PreDestroy
private void preDestroy() {
    postgresqlConnection.close().subscribe();
}

Maintenant, je crée simplement une méthode qui renvoie un flux de tout ce qui se trouve actuellement dans ma table, et je le fusionne également avec mes notifications, comme je l'ai dit avant que les notifications n'arrivent sous forme de json, j'ai donc dû la désérialiser et j'ai décidé d'utiliser ObjectMappeur. Ainsi, cela ressemblera à ceci :

private Flux<YourClass> getUpdatedRows() {
    return postgresqlConnection.getNotifications().map(notification -> {
        try {
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
        } catch (IOException e) {
            //handle exception
        }
    });
}

public Flux<YourClass> getDocuments() {
    return documentRepository.findAll().share().concatWith(getUpdatedRows());
}

J'espère que cela vous aidera.Cheers !