Ingestion de données en temps réel

Les données sont aujourd’hui porteuses d’un potentiel incroyable avec une quantité toujours plus importante au fil des années. En effet, près de 90% de la donnée digitale que nous possédons aujourd’hui a été créée ces deux dernières années exigeant d’outre-passer les technologies “classiques” du Big Data.
De cette problématique d’exploitation des données, émerge notamment l’enjeu relatif à la capacité de les traiter en temps réel. De plus en plus de secteurs s’engagent dans une migration du Big Data vers le “Fast Data”, un autre buzz word pour désigner l’application des analyses Big Data à de plus petits ensembles de données, en temps réel ou presque réel dès qu’ils sont générés.
Le secteur bancaire illustre très bien ce besoin avec la détection de fraude en temps réel.

Mais qu’est ce que l’analyse temps réel?

Si l’on se réfère aux définitions que l’on peut trouver sur le net, un système d’analyse de données temps réel est “un système événementiel disponible, scalable et stable capable de prendre des décisions (actions) avec une latence inférieure à 100ms”
Contrairement à l’analyse batch, la contrainte pour une analyse en temps réel est double: Non seulement le résultat issu du traitement doit être correct mais il doit répondre à une contrainte de temps. Certains systèmes d’analyse voient même le respect de la contrainte de temps plus importante que le résultat en lui-même.

En effet, chez ce qu’on appelle les systèmes temps réel dur (hard real time), la réponse du système dans les délais est vitale et l’absence de réponse peut s’avérer plus catastrophique qu’une réponse incorrecte (ex: contrôle aérien, contrôle d’une centrale nucléaire).
Dans ce contexte, il est indispensable pour les analystes de distinguer les données qui peuvent être stockées et conservées, et celles qui doivent être analysées immédiatement pour se révéler utiles. L’idée est alors d’exploiter les données qu’elles soient structurées ou non structurées et ce dès qu’elles sont générées, avant qu’elles ne perdent de leur intérêt. De ce fait, pour procéder à toutes ces analyses, il est indispensable de disposer d’un système de streaming capable de transférer les données dès qu’elles sont générées. En outre, il est nécessaire de disposer d’un Data Warehouse capable de traiter les données dès qu’elles y sont stockées.

Alors, quels sont les concepts derrière l’ingestion, le stockage et le traitement de données en temps réel ?

Ingestion d’un fichier CSV dans un topic kafka

Nous allons nous concentrer sur l’ingestion dans cet artixle avec l’utilisation de kafka de la plateforme Confluent à travers l’import de données d’un fichier type CSV et l’utilisation de KSQL pour l’enrichir.

Nous supposons ici que notre fichier source csv concerne des commandes effectuées par des clients: orders.csv et contient les attributs suivants:

order_id customer_id order_ts product order_total_usd
  • Télécharger sur le site de Confluent Confluent-hub-components et y télécharger le connecteur jcustenborder-kafka-connect-spooldir.
  • Vérifier que dans $PATH_TO_CONFLUENT/etc/Kafka, le fichier config connect-standalone.properties du worker contienne
    plugin.path=share/java,$PATH_TO_CONFLUENT/share/confluent_hub_components 
    
  • Aller dans le fichier /tmp et y créer 3 dossiers : source, finished, error.
  • importer son fichier source csv dans source
  • créer dans /tmp le fichier config spool_conf.tmp avec les éléments suivants:
    input.path=/tmp/source
    finished.path=/tmp/finished
    csv.first.row.as.header=true
    

  • Aller dans $PATH_TO_CONFLUENT/ et lancer la commande:

    export CLASSPATH="$(find share/confluent-hub-components/jcustenborder-kafka-connect-spooldir/lib/ -type f -name '*.jar' | tr '\n' ':')"
    

  • Lancer la commande:

    kafka-run-class com.github.jcustenborder.kafka.connect.spooldir.SchemaGenerator -t csv -f /tmp/source/orders.csv -c /tmp/spool_conf.tmp -i order_id
    

  • Lancer la commande curl suivante:

     curl -i -X POST -H "Accept:application/json" \
           -H "Content-Type:application/json" http://localhost:8083/connectors/ \
           -d '{
    "name": "csv-source-orders",
    "config": {
    "tasks.max": "1",
    "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
    "input.file.pattern": "^orders.*.csv$",
    "input.path": "/tmp/source",
    "finished.path": "/tmp/finished",
    "error.path": "/tmp/error",
    "halt.on.error": "false",
    "topic": "orders",
    "value.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"STRING\",\"isOptional\":true},\"customer_id\":{\"type\":\"STRING\",\"isOptional\":true},\"order_ts\":{\"type\":\"STRING\",\"isOptional\":true},\"product\":{\"type\":\"STRING\",\"isOptional\":true},\"order_total_usd\":{\"type\":\"STRING\",\"isOptional\":true}}}",
    "key.schema":"{\"name\":\"com.github.jcustenborder.kafka.connect.model.Key\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"order_id\":{\"type\":\"STRING\",\"isOptional\":true}}}",
    "csv.first.row.as.header": "true"
    }
    }'
    

  • Vérifier que le connecteur est lancé avec la commande:

    curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
    

  • Vérifier que les données sont bien dans le topic:

    kafka-avro-console-consumer \
    	--bootstrap-server localhost:9092 \
    	--property schema.registry.url=http://localhost:8081 \
    	--property print.key=true \
    	--topic orders \
    	--from-beginning \
    	--max-messages 1|jq '.'
    

Utilisation de KSQL pour enrichir les données CSV dans Kafka

A partir de l’invite de commande KSQL, nous allons d’abord déclarer le stream (orders), renseigné par le topic Kafka orders que nous avons créé lors de la phase d’ingestion.

CREATE STREAM orders WITH (KAFKA_TOPIC='orders',VALUE_FORMAT='avro');

La ligne de commande suivante:

SET 'auto.offset.reset' = 'earliest';

permet de dire à KSQL de traîter toutes les données dans le topic et non seulement les nouvelles, si cette dernière avait été laissée à ‘default’.

Avec KSQL, il est possible d’inspecter le contenu d’un topic avec la commande PRINT:

PRINT 'topic' FROM BEGINNING

S’il s’agit d’un format avro, vous verrez sûrement apparaître un caractère spécial du type
+�. C’est parce qu’il s’agit d’une clé AVRO et que KSQL ne gère pas ce type de clé.
L’instruction suivante prend un topic et le partitionne explicitement selon la clé fournie (la colonne ID par exemple):

CREATE STREAM KAFKA_STREAM WITH (KAFKA_TOPIC='topic',VALUE_FORMAT='AVRO');
CREATE STREAM KAFKA_STREAM_REKEYED AS SELECT * FROM KAFKA_STREAM PARTITION BY ID;
PRINT 'KAFKA_STREAM_REKEYED' FROM BEGINNING;

Cependant ici aussi, si vous inspectez le message, nous ne pouvons pas utiliser kafka-avro-console-consumer car cela suppose que la clé est encore codée en avro ce qui n’est pas le cas. On utilise alors la commande suivante à la place:

kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t KAFKA_STREAM_REKEYED -c1

On peut utiliser maintenant notre topic pour créer notre table KSQL…

CREATE TABLE KAFKA_TABLE WITH (KAFKA_TOPIC='KAFKA_STREAM_REKEYED', VALUE_FORMAT='AVRO', KEY='ID');

… Pour enfin l’utiliser pour enrichir notre stream ‘orders’ en supposant que dans notre cas, KAFKA_TABLE renseignerait des données clients par exemple (pour rester en cohérence avec notre fichier source ‘orders’).

SELECT O.ORDER_TS, O.PRODUCT, O.ORDER_TOTAL_USD, \
      K.ID, K.FIRST_NAME, K.LAST_NAME, K.EMAIL \
      FROM ORDERS O \
      LEFT OUTER JOIN KAFKA_TABLE C \
      ON O.CUSTOMER_ID = K.ID \
      LIMIT 5;

Conclusion

Pour finir, il s’agit d’applications de traitement de flux qui peuvent être créées grâce à SQL! Ici, l’objectif de la manipulation était de créer en temps réel des données enrichies à partir de la jonction de deux ensembles de données. Mais nous pourrions encore ajouter des prédicats aux données (inclure une clause WHERE), voire des agrégations.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.