Skip to content
Learni
Voir tous les tutoriels
Google Cloud

Comment créer des pipelines Document AI en 2026

Read in English

Introduction

Les pipelines Document AI de Google Cloud transforment des documents non structurés (PDF, images, scans) en données exploitables comme des entités nommées, tableaux ou formes. Pourquoi c'est crucial en 2026 ? Avec l'explosion des données papier numérisées, automatiser l'extraction réduit les coûts manuels de 80 % et accélère les processus RH, finance ou juridiques. Ce tutoriel beginner vous guide pas à pas pour créer un pipeline complet : du traitement synchrone au batch asynchrone, en intégrant Cloud Storage pour un flux production-ready. Imaginez extraire automatiquement factures, CV ou contrats – tout en Python, sans expertise ML. À la fin, vous aurez un script copiable qui traite 100+ docs en parallèle. Prêt à scaler vos données ? (132 mots)

Prérequis

  • Compte Google Cloud activé avec facturation (crédit gratuit 300 $)
  • Console GCP : activez Document AI API (https://console.cloud.google.com/apis/library/documentai.googleapis.com)
  • Python 3.9+ installé
  • gcloud CLI configuré (gcloud auth login et gcloud config set project VOTRE_PROJECT_ID)
  • Créez un processeur FORM_PARSER via console Document AI (région eu recommandée pour Europe)

Installer les dépendances

setup.sh
#!/bin/bash
python3 -m venv venv
dource venv/bin/activate
pip install --upgrade pip
pip install google-cloud-documentai google-cloud-storage python-dotenv

Ce script crée un environnement virtuel isolé, installe le SDK Document AI officiel et les libs pour Storage. Lancez-le pour éviter les conflits de paquets. Note : Remplacez source par . sur macOS si erreur.

Configurer l'environnement

Créez un fichier .env avec vos creds : GOOGLE_CLOUD_PROJECT=your-project-id, LOCATION=eu, PROCESSOR_ID=your-processor-id (obtenu dans console Document AI > Processeurs). Téléchargez une clé service account JSON et set GOOGLE_APPLICATION_CREDENTIALS=path/TO/key.json. Cela sécurise l'auth sans exposer en code.

Traiter un document synchrone

sync_process.py
import os
from dotenv import load_dotenv
from google.cloud import documentai_v1 as documentai

load_dotenv()

client = documentai.DocumentProcessorServiceClient()
name = f"projects/{os.getenv('GOOGLE_CLOUD_PROJECT')}/locations/{os.getenv('LOCATION')}/processors/{os.getenv('PROCESSOR_ID')}"

with open('sample.pdf', 'rb') as image:
    image_content = image.read()

request = documentai.ProcessRequest(
    name=name,
    raw_document=documentai.RawDocument(content=image_content, mime_type='application/pdf')
)

result = client.process_request(request=request)
document = result.document

entities = []
for entity in document.entities:
    entities.append(f"{entity.type_}: {entity.mention_text} (confidence: {entity.confidence:.2f})")

print("Entités extraites :", entities)

Ce script traite un PDF local de façon synchrone via le processeur FORM_PARSER, extrait entités (noms, dates, montants) et affiche avec confiance. Idéal pour tests rapides < 15 pages. Piège : MIME type doit matcher exactement, sinon erreur 400.

Passer au batch processing

Pour scaler, passez au mode asynchrone : uploadez sur GCS, lancez un job batch, poll les résultats. C'est le cœur d'un pipeline : parallélise 1000+ docs sans timeout.

Lancer un job batch

batch_process.py
import os
import time
from dotenv import load_dotenv
from google.cloud import documentai_v1 as documentai

load_dotenv()

client = documentai.DocumentProcessorServiceClient()
parent = f"projects/{os.getenv('GOOGLE_CLOUD_PROJECT')}/locations/{os.getenv('LOCATION')}/processors/{os.getenv('PROCESSOR_ID')}"

input_config = documentai.BatchProcessRequest.InputConfig(
    gcs_source=documentai.GcsSource(uri='gs://your-bucket/input/*.pdf')
)
output_config = documentai.BatchProcessRequest.OutputConfig(
    gcs_destination=documentai.GcsDestination(uri='gs://your-bucket/output/')
)

request = documentai.BatchProcessRequest(
    parent=parent,
    input_config=input_config,
    output_config=output_config
)

operation = client.batch_process_documents(request=request)

while not operation.done:
    print('En cours...')
    time.sleep(60)
    operation = client.get_operation(name=operation.name)

print('Batch terminé ! Résultats dans GCS output.')

Lance un job batch sur tous les PDF d'un bucket GCS input, stocke JSON results en output. Polling évite timeouts (jobs >5min). Créez buckets your-bucket avant. Coût : ~1$/1000 pages.

Intégrer un pipeline complet avec trigger

Analogie : Comme une chaîne de montage automobile, preprocessing → Document AI → postprocessing. Ici, trigger GCS sur upload déclenche Function → batch → Pub/Sub notif.

Cloud Function trigger pipeline

main.py
import functions_framework
from google.cloud import documentai_v1 as documentai

def process_pipeline(event, context):
    client = documentai.DocumentProcessorServiceClient()
    project_id = 'your-project-id'
    location = 'eu'
    processor_id = 'your-processor-id'
    bucket_name = event['bucket']
    file_name = event['name']

    parent = f"projects/{project_id}/locations/{location}/processors/{processor_id}"
    input_config = documentai.BatchProcessRequest.InputConfig(
        gcs_source=documentai.GcsSource(uri=f'gs://{bucket_name}/{file_name}')
    )
    output_config = documentai.BatchProcessRequest.OutputConfig(
        gcs_destination=documentai.GcsDestination(uri=f'gs://{bucket_name}/output/')
    )

    request = documentai.BatchProcessRequest(
        parent=parent,
        input_config=input_config,
        output_config=output_config
    )

    operation = client.batch_process_documents(request=request)
    print(f'Job lancé : {operation.name}')

functions_framework.cloud_event(process_pipeline)

Déployez comme Cloud Function (gcloud functions deploy process-pipeline --runtime python312 --trigger-event google.storage.object.finalize --trigger-bucket your-bucket). Trigger auto sur upload GCS. Scalable serverless.

Lire et parser résultats batch

parse_results.py
import os
import json
from dotenv import load_dotenv
from google.cloud import storage

load_dotenv()

def parse_gcs_output(bucket_name, output_prefix):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=output_prefix)

    for blob in blobs:
        if blob.name.endswith('.json'):
            content = blob.download_as_text()
            doc = json.loads(content)['document']
            entities = []
            for entity in doc.get('entities', []):
                entities.append({
                    'type': entity['type'],
                    'text': entity['mentionText'],
                    'confidence': entity['confidence']
                })
            print(f"Doc {blob.name}: {entities}")

parse_gcs_output('your-bucket', 'output/')

Parse JSON GCS générés par batch, extrait entités en dict Python. Utilisez en post-hook ou cron. Piège : JSON indenté, utilisez json.loads strict.

Pipeline end-to-end script

full_pipeline.py
import os
from dotenv import load_dotenv
from google.cloud import storage, documentai_v1 as documentai
import time

load_dotenv()

project = os.getenv('GOOGLE_CLOUD_PROJECT')
location = os.getenv('LOCATION')
processor = os.getenv('PROCESSOR_ID')
bucket = 'your-bucket'

# Upload sample
storage_client = storage.Client()
bucket_obj = storage_client.bucket(bucket)
blob = bucket_obj.blob('input/sample.pdf')
blob.upload_from_filename('sample.pdf')

# Batch
client = documentai.DocumentProcessorServiceClient()
parent = f"projects/{project}/locations/{location}/processors/{processor}"
req = documentai.BatchProcessRequest(
    parent=parent,
    input_config=documentai.BatchProcessRequest.InputConfig(gcs_source=documentai.GcsSource(uri=f'gs://{bucket}/input/')),
    output_config=documentai.BatchProcessRequest.OutputConfig(gcs_destination=documentai.GcsDestination(uri=f'gs://{bucket}/output/'))
)
operation = client.batch_process_documents(request=req)

while not operation.done:
    time.sleep(30)
    operation = client.get_operation(operation.name)

print('Pipeline complet terminé !')

Script autonome : upload → batch → wait. Lancez avec python full_pipeline.py. Teste tout le flux en 1 go. Idéal pour CI/CD.

Bonnes pratiques

  • Utilisez des régions conformes RGPD (eu) pour données sensibles.
  • Batchifiez : synchrone seulement pour <5 pages, batch pour scale.
  • Monitorez quotas (500 ops/min) via console Metrics.
  • Cachez processor ID en Secret Manager, pas hardcoded.
  • Post-traitez : Validez confiance >0.8 avant DB insert.

Erreurs courantes à éviter

  • MIME erroné : 'application/pdf' strict, pas 'pdf' → 400 Bad Request.
  • Pas d'auth : Oubli GOOGLE_APPLICATION_CREDENTIALS → 403 Forbidden.
  • Polling trop agressif : Sleep <30s throttle les ops.
  • Buckets manquants : Créez gsutil mb gs://your-bucket avant.

Pour aller plus loin