Notre

blog

Intégration de données avec Quarkus et Camel : le guide ultime

Intégration de données avec Quarkus et Camel : le guide ultime

Cet article, rédigé par l’un de nos collaborateurs, explore une solution innovante pour l’intégration de données en utilisant Quarkus et Apache Camel.

À travers une expérience pratique menée pour un acteur du secteur retail, il met en lumière les avantages de ces technologies dans un environnement Kubernetes, tout en présentant des détails techniques et des exemples concrets.

Découvrez comment cette combinaison permet de répondre aux besoins d’agilité et de performance des systèmes modernes.

Introduction

Pour un acteur du retail en région, nous avions besoin de rapidement remplacer une solution d’intégration de données éditeur par un système plus léger et avec une maitrise côté développement.

 

Il nous fallait faire de l’intégration depuis un SFTP vers un ElasticSearch, avec des csv sur un format spécifique.

 

L’infrastructure cliente étant sur Kubernetes, Quarkus était un choix idéal : les flux sont packagés nativement et déployés au travers d’images docker.

 

Pour la partie métrics : 

  Quarkus SpringBoot
Taille image Docker 250 Mo 250 Mo
Consommation RAM 150 Mo 200-250 Mo
Démarrage 2s 30s

 

Sprint et Quarkus proposent tous les deux la compilation native, réduisant ainsi encore la consommation. Tous deux utilisent GraalVM pour leur compilation.

 

Concernant Apache Camel, le framework est basé sur les différents patterns d’intégration, très actif et propose plus de 300 connecteurs ainsi que 50 formats de données (XML, Json, etc.). Il est possible de l’utiliser en Java ou Xml et possède aussi son propre DSL.

 

Parlons Développement 

Quarkus est clé en main. Comme avec SpringBoot, on va retrouver les mêmes types d’annotations mais avec une formulation différente et reprenant les annotations de base de Java. 

Par exemple : 
•    @Autowired devient @Inject
•    @SpringBootTest devient @QuarkusTest

 

Un autre avantage sur le développement de flux d’intégration est le DevService de Quarkus. De nombreux composants comme Kafka, JMS, SQL, Mongo, Keycloak… sont disponibles nativement dans Quarkus et le framework se charge tout seul de créer les containers en local et d’injecter les chaines de connexions dans votre application.

 

C’est aussi très utile dans le cadre de tests et surtout de tests d’intégration. Quarkus se charge aussi de démarrer ces conteneurs dès que les tests en ont besoin et permet par exemple de tester l’intégration en BDD ou le transfert d’un fichier vers une Java Message Service (JMS). 

 

On peut également écrire nos propres conteneurs si besoin. Par exemple, Quarkus ne fournit pas de SFTP, comme le fichier reste assez utilisé dans le SI, nous avons créé un conteneur de tests qui permet de vérifier avant la publication de l’image si tout se passe bien de A à Z. 
 

Un flux ça donne quoi ? 

Prenons ici le cas d’un flux simple de transfert de fichier contenant X lignes qui doit être intégré dans une base MongoDB ligne par ligne. 

 

2024 12 17 - Camel-Quarkus_1ok.png

 

Dans Camel, on va retrouver des connecteurs (SFTP, JMS, Rest API pour les plus classiques) ou encore des connecteurs dédiés (GCS, Azure Service Bus, AWS S3, etc.). Ces composants peuvent faire office de consommateurs qui récupèrent la donnée ou faire office de producteurs qui vont écrire la donnée.

 

Dans notre cas, nous avons donc un connecteur SFTP pour la consommation et un connecteur ElasticSearch pour la production.

 

L’autre grande famille de composants va être les processeurs. Elle couvre tous les EIP (Entreprise Integration Patterns) comme le routage, l’agrégation, l’enrichissement, etc. Ici, nous avons besoin de découper notre fichier, qui sera notre message principal, en plusieurs messages individuels. Nous utiliserons alors le Splitter.

 

Initialisation

Tout d’abord, créons notre projet Quarkus + Maven : Quarkus – Start conding with code.quarkus.io 

 

Camel supporte la version Java 21. Un projet Camel Quarkus existe d’ailleurs pour lier Quarkus avec les composants Camel.

 

Nous nous retrouvons alors avec notre projet prêt à être utilisé et nos extensions qui correspondent à notre besoin Camel : 

  • Pour toutes les fonctionnalités et EIP de base : camel-quarkus-core
  • Pour les transformations : camel-quarkus-bean et camel-quarkus-bindy 
  • Pour ElasticSearch : camel-quarkus-elasticsearch et quarkus-elasticsearch-rest-client pour la partie client authentification et dev
  • Pour le SFTP : camel-quarkus-ftp (la librairie embarque le FTP, FTPS et SFTP)

 

NB : L’extension Camel pour ElasticSearch n’est pas proposée par défaut. Il s’agit de camel-quarkus-elasticsearch et non de camel-quarkus-elasticsearch-rest-client

 

On peut importer notre projet Maven dans notre IDE favori (IntelliJ ici) et commencer à coder.

 

Notre première route

Dans un premier temps, nous allons initialiser notre route comme sur le schéma. Cela est possible avec la classe RouteBuilder. Nous allons également ajouter l’annotation @ApplicationScoped pour que Quarkus détecte notre bean et le charge dans l’application. 

 

package com.elosi.demo;

import jakarta.enterprise.context.ApplicationScoped;
import org.apache.camel.builder.RouteBuilder;

@ApplicationScoped
public class MyCustomRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// TODO
}
}

 

Ensuite, il nous faut écrire la route. 

Cela commence par un from() avec le connecteur, suivis ensuite de des différents processeurs. Attention, il ne peut y avoir qu’un from() ! 

 

Les connecteurs sont simples à écrire, ceux sont des URI avec des paramètres. 

 

Pour notre SFTP, nous aurons donc sftp:host:port/directoryName que nous allons variabiliser pour pouvoir le modifier facilement si besoin. 

 

Pour cela, Camel nous permet directement d’inclure notre configuration (ici en YAML) dans le code de cette manière : 

 

from("sftp:{{sftp.host}}:{{sftp.port}}/{{sftp.folder}}")

 

Bien sûr, il va encore nous manquer des informations cruciales. C’est là qu’interviennent les paramètres.

 

Avec notre méthode, nous allons remplacer configure() TODO par : 

 

from("sftp:{{sftp.host}}:{{sftp.port}}/{{sftp.folder}}" +
"?username={{sftp.username}}" +
"&password={{sftp.password}}" +
"&preferredAuthentications=password")

 

Beaucoup de choses sont paramétrables, mais ce n’est pas nécessaire de le faire pour toutes. Il y a des valeurs par défaut et d’autres qui ne sont pas utilisées.

 

Ensuite notre destination, l’ElasticSearch : 
Comme c’est un endpoint de destination, on l’écrit avec to() (il peut y en avoir plusieurs) qui contient à nouveau l’URI Camel. 

 

L’opération définit comment nous voulons appeler l’API Elastic. Ici ça sera en bulk car notre csv contient de nombreuses lignes.

 

from("sftp:{{sftp.host}}:{{sftp.port}}/{{sftp.folder}}" +
"?username={{sftp.username}}" +
"&password={{sftp.password}}" +
"&preferredAuthentications=password")
.to("elasticsearch:{{elasticsearch.cluster-name}}" +
"?operation=Bulk" +
"&indexName={{elasticsearch.index}}");

 

Il nous manque ensuite une petite partie de l’intégration. En effet, nous avons un fichier et nous souhaitons avoir une Array Json pour l’intégrer via l’API bulk. 

 

Pour cela, nous allons utiliser l’extension camel-quarkus-bindy, qui permet de transformer un csv, en liste de POJO. 

 

Il faut créer le POJO avec les bonnes annotations et l’importer ensuite via bindy dans la route Camel. 

 

Notre POJO basique avec les annotations de Bindy va nous permettre de faire la transformation automatiquement.

 

package com.elosi.demo.models;

import lombok.Data;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;

@Data
@CsvRecord(separator = ";")
public class GenericFormat {
@DataField(pos = 1)
private String lastName;
@Datafield(pos = 2)
private String firstName;
@DataField(pos = 3)
private String city;
}

 

On va ensuite ajouter deux lignes au niveau de la route pour transformer notre csv en POJO. On va transformer notre fichier en une liste<GenericFormat> qui pourra être consommer par notre endpoint ElasticSearch. 

 

// On ajoute ici le format de transformation
BindyCsvDataFormat genericFormat = new BindyCsvDataFormat(GenericFormat.class);
from("sftp:{{sftp.host}}:{{sftp.port}}/{{sftp.folder}}" +
"?username={{sftp.username}}" +
"&password={{sftp.password}}" +
"&preferredAuthentications=password")
/* On ajoute ici la deserialisation via le mot clé un marshall auquel on passe le format de sortie.
Cela va transformer les lignes de notre CSV en une List<GenericFormat>
*/ La sérialisation peut se faire quand à elle via un .marshall()
.unmarshall(genericFormat)
.to("elasticsearch:{{elasticsearch.cluster-name}}" +
"?operation=Bulk" +
"&indexName={{elasticsearch.index}}");

 

Pour vérifier l’indexation dans l’Elastic, on peut récupérer le code retour des insertions. Elastic renvoie un objet de type List<BulkResponseltem>. Dans l’object BulkResponseltem le champs statut nous intéresse : une 201 signifiant que le message a bien été intégré !

 

Un autre code pourrait indiquer une erreur et nous pouvons récupérer ou afficher le message d’erreur en question. Pour cela, nous allons utiliser un processeur en Camel. Il permet d’accéder à l’échange en cours et de pouvoir le manipuler, en faire un peu ce qu’on veut, voir même le remplacer. 

 

.process(exchange -> {
List<BulkResponseItem> response = exchange.getMessage().getBody(List.class);
List<BulkResponseItem> totalErrors = response.stream().filter(bulkResponseItem ->
bulkResponseItem.status() != 201).toList();
if (!totalErrors.isEmpty()) {
exchange.getMessage().setHeader(ELASTICSEARCH_ERROR_HEADER, true);
exchange.getMessage().setBody(totalErrors);
}
}).id("error_check")
.filter().header(ELASTICSEARCH_ERROR_HEADER)
.log(LoggingLevel.ERROR, "${body.size} error(s) during import on elastic")
.end()

 

On peut remarquer aussi l’utilisation d’un filter, qui permet de savoir de manière conditionnelle si on doit ou non effectuer une action durant le process de l’échange. 

 

Ici, la condition est déterminée par la valeur du header elasticsearch_error_header qui est ajouté un peu avant en cas d’erreur. 

 

On y ajoute également la liste des erreurs uniquement et on en affiche le nombre.

 

Et demain ?

On peut imaginer ajouter plusieurs formats de données en entrées et faire un mapping au format générique afin de normaliser les données dans ElasticSearch. 

On peut aussi imaginer un mode événementiel où l’arrivée du fichier déclenche l’exécution du flux, avec création du pod, etc. afin de limiter encore plus l’empreinte mémoire et se diriger vers une solution la Green IT possible.