Dans Apache Kafka, les applications Java appelées producteurs écrivent des messages structurés à un cluster Kafka (composé de courtiers). De même, les applications Java appelées consommateurs lisent ces messages à partir du même cluster. Dans certaines organisations, il existe différents groupes en charge de l'écriture et de la gestion des producteurs et des consommateurs. Dans de tels cas, un problème majeur peut être la coordination du format de message convenu entre les producteurs et les consommateurs.
Cet exemple montre comment utiliser Apache Avro pour sérialiser les enregistrements qui sont produits vers Apache Kafka tout en permettant l'évolution des schémas et la mise à jour non synchrone des applications productrices et consommatrices.
Sérialisation et désérialisation
Un enregistrement Kafka (anciennement appelé message) se compose d'une clé, d'une valeur et d'en-têtes. Kafka n'est pas conscient de la structure des données dans la clé et la valeur des enregistrements. Il les gère comme des tableaux d'octets. Mais les systèmes qui lisent les enregistrements de Kafka se soucient des données de ces enregistrements. Vous devez donc produire des données dans un format lisible. Le format de données que vous utilisez doit
- Soyez compact
- Soyez rapide à encoder et à décoder
- Autoriser l'évolution
- Autoriser les systèmes en amont (ceux qui écrivent dans un cluster Kafka) et les systèmes en aval (ceux qui lisent à partir du même cluster Kafka) à effectuer une mise à niveau vers des schémas plus récents à des moments différents
JSON, par exemple, est explicite mais n'est pas un format de données compact et est lent à analyser. Avro est un framework de sérialisation rapide qui crée une sortie relativement compacte. Mais pour lire les enregistrements Avro, vous avez besoin du schéma avec lequel les données ont été sérialisées.
Une option consiste à stocker et à transférer le schéma avec l'enregistrement lui-même. C'est très bien dans un fichier où vous stockez le schéma une fois et l'utilisez pour un grand nombre d'enregistrements. Cependant, le stockage du schéma dans chaque enregistrement Kafka ajoute une surcharge importante en termes d'espace de stockage et d'utilisation du réseau. Une autre option consiste à disposer d'un ensemble convenu de mappages identifiant-schéma et à faire référence aux schémas par leurs identifiants dans l'enregistrement.
De l'objet à l'enregistrement Kafka et retour
Les applications de producteur n'ont pas besoin de convertir les données directement en tableaux d'octets. KafkaProducer est une classe générique qui nécessite que son utilisateur spécifie les types de clé et de valeur. Ensuite, les producteurs acceptent les instances de ProducerRecord
qui ont les mêmes paramètres de type. La conversion de l'objet en tableau d'octets est effectuée par un sérialiseur. Kafka fournit quelques sérialiseurs primitifs :par exemple, IntegerSerializer
, ByteArraySerializer
, StringSerializer
. Côté consommateur, des désérialiseurs similaires convertissent des tableaux d'octets en un objet que l'application peut gérer.
Il est donc logique de se connecter au niveau du sérialiseur et du désérialiseur et de permettre aux développeurs d'applications de production et de consommation d'utiliser l'interface pratique fournie par Kafka. Bien que les dernières versions de Kafka autorisent les ExtendedSerializers
et ExtendedDeserializers
pour accéder aux en-têtes, nous avons décidé d'inclure l'identifiant de schéma dans la clé et la valeur des enregistrements Kafka au lieu d'ajouter des en-têtes d'enregistrement.
Avro Essentials
Avro est un framework de sérialisation de données (et d'appel de procédure à distance). Il utilise un document JSON appelé schéma pour décrire les structures de données. La plupart des utilisations d'Avro se font via GenericRecord ou des sous-classes de SpecificRecord. Les classes Java générées à partir des schémas Avro sont des sous-classes de ces derniers, tandis que les premières peuvent être utilisées sans connaissance préalable de la structure de données utilisée.
Lorsque deux schémas satisfont un ensemble de règles de compatibilité, les données écrites avec un schéma (appelé le schéma de l'écrivain) peuvent être lues comme si elles étaient écrites avec l'autre (appelé le schéma du lecteur). Les schémas ont une forme canonique qui contient tous les détails non pertinents pour la sérialisation, tels que les commentaires, supprimés pour faciliter la vérification d'équivalence.
VersionedSchema et SchemaProvider
Comme mentionné précédemment, nous avons besoin d'un mappage un à un entre les schémas et leurs identifiants. Parfois, il est plus facile de se référer aux schémas par des noms. Lorsqu'un schéma compatible est créé, il peut être considéré comme une version suivante du schéma. Ainsi, nous pouvons faire référence à des schémas avec un couple nom, version. Appelons le schéma, son identifiant, son nom et sa version ensemble un VersionedSchema
. Cet objet peut contenir des métadonnées supplémentaires requises par l'application.
public class VersionedSchema { private final int id; private final String name; private final int version; private final Schema schema; public VersionedSchema(int id, String name, int version, Schema schema) { this.id = id; this.name = name; this.version = version; this.schema = schema; } public String getName() { return name; } public int getVersion() { return version; } public Schema getSchema() { return schema; } public int getId() { return id; } }
SchemaProvider
les objets peuvent rechercher les instances de VersionedSchema
.
public interface SchemaProvider extends AutoCloseable { public VersionedSchema get(int id); public VersionedSchema get(String schemaName, int schemaVersion); public VersionedSchema getMetadata(Schema schema); }
La façon dont cette interface est implémentée est couverte dans "Mise en œuvre d'un magasin de schémas" dans un futur article de blog.
Sérialisation des données génériques
Lors de la sérialisation d'un enregistrement, nous devons d'abord déterminer quel schéma utiliser. Chaque enregistrement a un getSchema
méthode. Mais trouver l'identifiant à partir du schéma peut prendre du temps. Il est généralement plus efficace de définir le schéma au moment de l'initialisation. Cela peut se faire directement par identifiant ou par nom et version. De plus, lors de la production sur plusieurs sujets, nous pouvons souhaiter définir différents schémas pour différents sujets et découvrir le schéma à partir du nom de sujet fourni en tant que paramètre de la méthode serialize(T, String)
. Cette logique est omise dans nos exemples par souci de brièveté et de simplicité.
private VersionedSchema getSchema(T data, String topic) { return schemaProvider.getMetadata( data.getSchema()); }
Avec le schéma en main, nous devons le stocker dans notre message. La sérialisation de l'ID dans le cadre du message nous donne une solution compacte, car toute la magie se produit dans le sérialiseur/désérialiseur. Il permet également une intégration très facile avec d'autres frameworks et bibliothèques qui prennent déjà en charge Kafka et permet à l'utilisateur d'utiliser son propre sérialiseur (tel que Spark).
En utilisant cette approche, nous écrivons d'abord l'identifiant du schéma sur les quatre premiers octets.
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException { try (DataOutputStream os = new DataOutputStream(stream)) { os.writeInt(id); } }
Ensuite, nous pouvons créer un DatumWriter
et sérialiser l'objet.
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema); datumWriter.write(data, encoder); encoder.flush(); }
En mettant tout cela ensemble, nous avons implémenté un sérialiseur de données générique.
public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> { private SchemaProvider schemaProvider; @Override public void configure(Map<String, ?> configs, boolean isKey) { schemaProvider = SchemaUtils.getSchemaProvider(configs); } @Override public byte[] serialize(String topic, T data) { try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { VersionedSchema schema = getSchema(data, topic); writeSchemaId(stream, schema.getId()); writeSerializedAvro(stream, data, schema.getSchema()); return stream.toByteArray(); } catch (IOException e) { throw new RuntimeException("Could not serialize data", e); } } private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...} private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...} private VersionedSchema getSchema(T data, String topic) {...} @Override public void close() { try { schemaProvider.close(); } catch (Exception e) { throw new RuntimeException(e); } } }
Désérialisation des données génériques
La désérialisation peut fonctionner avec un seul schéma (avec lequel les données du schéma ont été écrites), mais vous pouvez spécifier un schéma de lecteur différent. Le schéma du lecteur doit être compatible avec le schéma avec lequel les données ont été sérialisées, mais n'a pas besoin d'être équivalent. Pour cette raison, nous avons introduit des noms de schéma. Nous pouvons maintenant spécifier que nous voulons lire les données avec une version spécifique d'un schéma. Au moment de l'initialisation, nous lisons les versions de schéma souhaitées par nom de schéma et stockons les métadonnées dans readerSchemasByName
pour un accès rapide. Nous pouvons maintenant lire chaque enregistrement écrit avec une version compatible du schéma comme s'il avait été écrit avec la version spécifiée.
@Override public void configure(Map<String, ?> configs, boolean isKey) { this.schemaProvider = SchemaUtils.getSchemaProvider(configs); this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider); }
Lorsqu'un enregistrement doit être désérialisé, nous lisons d'abord l'identifiant du schéma du rédacteur. Cela permet de rechercher le schéma du lecteur par son nom. Avec les deux schémas disponibles, nous pouvons créer un GeneralDatumReader
et lire le compte rendu.
@Override public GenericData.Record deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); VersionedSchema readerSchema = readerSchemasByName.get(writerSchema.getName()); GenericData.Record avroRecord = readAvroRecord(stream, writerSchema.getSchema(), readerSchema.getSchema()); return avroRecord; } catch (IOException e) { throw new RuntimeException(e); } } private int readSchemaId(InputStream stream ) throws IOException { try(DataInputStream is = new DataInputStream(stream)) { return is.readInt(); } } private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); GenericData.Record record = new GenericData.Record(readerSchema); datumReader.read(record, decoder); return record; }
Traiter des enregistrements spécifiques
Le plus souvent, il y a une classe que nous voulons utiliser pour nos enregistrements. Cette classe est alors généralement générée à partir d'un schéma Avro. Apache Avro fournit des outils pour générer du code Java à partir de schémas. L'un de ces outils est le plugin Avro Maven. Les classes générées ont le schéma à partir duquel elles ont été générées disponibles au moment de l'exécution. Cela rend la sérialisation et la désérialisation plus simples et plus efficaces. Pour la sérialisation, nous pouvons utiliser la classe pour connaître l'identifiant de schéma à utiliser.
@Override public void configure(Map<String, ?> configs, boolean isKey) { String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString(); try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) { Class<?> recordClass = Class.forName(className); Schema writerSchema = new SpecificData(recordClass.getClassLoader()).getSchema(recordClass); this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId(); } catch (Exception e) { throw new RuntimeException(e); } }
Ainsi, nous n'avons pas besoin de la logique pour déterminer le schéma à partir du sujet et des données. Nous utilisons le schéma disponible dans la classe d'enregistrement pour écrire des enregistrements.
De même, pour la désérialisation, le schéma du lecteur peut être trouvé à partir de la classe elle-même. La logique de désérialisation devient plus simple, car le schéma du lecteur est fixé au moment de la configuration et n'a pas besoin d'être recherché par nom de schéma.
@Override public T deserialize(String topic, byte[] data) { try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) { int schemaId = readSchemaId(stream); VersionedSchema writerSchema = schemaProvider.get(schemaId); return readAvroRecord(stream, writerSchema.getSchema(), readerSchema); } catch (IOException e) { throw new RuntimeException(e); } } private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException { DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null); return datumReader.read(null, decoder); }
Lecture supplémentaire
Pour plus d'informations sur la compatibilité des schémas, consultez la spécification Avro pour la résolution de schéma.
Pour plus d'informations sur les formes canoniques, consultez la spécification Avro pour l'analyse de la forme canonique pour les schémas.
La prochaine fois…
La partie 2 montrera une implémentation d'un système pour stocker les définitions de schéma Avro.