Skip to content
Learni
View all tutorials
Google Cloud

How to Build Document AI Pipelines with Google Cloud in 2026

Lire en français

Introduction

Google Cloud's Document AI pipelines turn unstructured documents (PDFs, images, scans) into actionable data like named entities, tables, or forms. Why is it crucial in 2026? With the explosion of digitized paper data, automating extraction cuts manual costs by 80% and speeds up HR, finance, or legal processes. This beginner tutorial walks you step-by-step through building a complete pipeline: from synchronous processing to asynchronous batch jobs, integrated with Cloud Storage for production-ready workflows. Imagine automatically pulling invoices, resumes, or contracts—all in Python, no ML expertise needed. By the end, you'll have a copy-paste script that handles 100+ docs in parallel. Ready to scale your data? (132 words)

Prerequisites

  • Active Google Cloud account with billing enabled (free $300 credit)
  • GCP Console: Enable Document AI API (https://console.cloud.google.com/apis/library/documentai.googleapis.com)
  • Python 3.9+ installed
  • gcloud CLI set up (gcloud auth login and gcloud config set project YOUR_PROJECT_ID)
  • Create a FORM_PARSER processor via Document AI console (recommend eu region for Europe)

Install Dependencies

setup.sh
#!/bin/bash
python3 -m venv venv
dource venv/bin/activate
pip install --upgrade pip
pip install google-cloud-documentai google-cloud-storage python-dotenv

This script sets up an isolated virtual environment and installs the official Document AI SDK plus Storage libraries. Run it to avoid package conflicts. Note: Use . instead of source on macOS if you hit an error.

Set Up Your Environment

Create a .env file with your credentials: GOOGLE_CLOUD_PROJECT=your-project-id, LOCATION=eu, PROCESSOR_ID=your-processor-id (get it from Document AI console > Processors). Download a service account JSON key and set GOOGLE_APPLICATION_CREDENTIALS=PATH/TO/key.json. This keeps authentication secure without hardcoding it.

Process a Single Document Synchronously

sync_process.py
import os
from dotenv import load_dotenv
from google.cloud import documentai_v1 as documentai

load_dotenv()

client = documentai.DocumentProcessorServiceClient()
name = f"projects/{os.getenv('GOOGLE_CLOUD_PROJECT')}/locations/{os.getenv('LOCATION')}/processors/{os.getenv('PROCESSOR_ID')}"

with open('sample.pdf', 'rb') as image:
    image_content = image.read()

request = documentai.ProcessRequest(
    name=name,
    raw_document=documentai.RawDocument(content=image_content, mime_type='application/pdf')
)

result = client.process_request(request=request)
document = result.document

entities = []
for entity in document.entities:
    entities.append(f"{entity.type_}: {entity.mention_text} (confidence: {entity.confidence:.2f})")

print("Entités extraites :", entities)

This script synchronously processes a local PDF using the FORM_PARSER processor, extracts entities (names, dates, amounts), and prints them with confidence scores. Perfect for quick tests on docs under 15 pages. Pitfall: MIME type must match exactly, or you'll get a 400 error.

Scale to Batch Processing

To scale up, switch to asynchronous mode: upload to GCS, launch a batch job, and poll for results. This is the heart of a real pipeline—it parallelizes 1000+ docs without timeouts.

Launch a Batch Job

batch_process.py
import os
import time
from dotenv import load_dotenv
from google.cloud import documentai_v1 as documentai

load_dotenv()

client = documentai.DocumentProcessorServiceClient()
parent = f"projects/{os.getenv('GOOGLE_CLOUD_PROJECT')}/locations/{os.getenv('LOCATION')}/processors/{os.getenv('PROCESSOR_ID')}"

input_config = documentai.BatchProcessRequest.InputConfig(
    gcs_source=documentai.GcsSource(uri='gs://your-bucket/input/*.pdf')
)
output_config = documentai.BatchProcessRequest.OutputConfig(
    gcs_destination=documentai.GcsDestination(uri='gs://your-bucket/output/')
)

request = documentai.BatchProcessRequest(
    parent=parent,
    input_config=input_config,
    output_config=output_config
)

operation = client.batch_process_documents(request=request)

while not operation.done:
    print('En cours...')
    time.sleep(60)
    operation = client.get_operation(name=operation.name)

print('Batch terminé ! Résultats dans GCS output.')

This launches a batch job on all PDFs in a GCS input bucket and stores JSON results in the output folder. Polling prevents timeouts (jobs over 5 min). Create your your-bucket buckets first. Cost: ~$1 per 1000 pages.

Build a Complete Pipeline with Triggers

Analogy: Like an automotive assembly line—preprocessing → Document AI → postprocessing. Here, a GCS upload trigger fires a Cloud Function → batch job → Pub/Sub notification.

Cloud Function Trigger for Pipeline

main.py
import functions_framework
from google.cloud import documentai_v1 as documentai

def process_pipeline(event, context):
    client = documentai.DocumentProcessorServiceClient()
    project_id = 'your-project-id'
    location = 'eu'
    processor_id = 'your-processor-id'
    bucket_name = event['bucket']
    file_name = event['name']

    parent = f"projects/{project_id}/locations/{location}/processors/{processor_id}"
    input_config = documentai.BatchProcessRequest.InputConfig(
        gcs_source=documentai.GcsSource(uri=f'gs://{bucket_name}/{file_name}')
    )
    output_config = documentai.BatchProcessRequest.OutputConfig(
        gcs_destination=documentai.GcsDestination(uri=f'gs://{bucket_name}/output/')
    )

    request = documentai.BatchProcessRequest(
        parent=parent,
        input_config=input_config,
        output_config=output_config
    )

    operation = client.batch_process_documents(request=request)
    print(f'Job lancé : {operation.name}')

functions_framework.cloud_event(process_pipeline)

Deploy as a Cloud Function (gcloud functions deploy process-pipeline --runtime python312 --trigger-event google.storage.object.finalize --trigger-bucket your-bucket). It auto-triggers on GCS uploads. Fully scalable and serverless.

Read and Parse Batch Results

parse_results.py
import os
import json
from dotenv import load_dotenv
from google.cloud import storage

load_dotenv()

def parse_gcs_output(bucket_name, output_prefix):
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=output_prefix)

    for blob in blobs:
        if blob.name.endswith('.json'):
            content = blob.download_as_text()
            doc = json.loads(content)['document']
            entities = []
            for entity in doc.get('entities', []):
                entities.append({
                    'type': entity['type'],
                    'text': entity['mentionText'],
                    'confidence': entity['confidence']
                })
            print(f"Doc {blob.name}: {entities}")

parse_gcs_output('your-bucket', 'output/')

Parses JSON files generated by batch jobs from GCS, extracts entities into Python dicts. Use as a post-hook or cron job. Pitfall: JSON is indented—stick to json.loads for strict parsing.

End-to-End Pipeline Script

full_pipeline.py
import os
from dotenv import load_dotenv
from google.cloud import storage, documentai_v1 as documentai
import time

load_dotenv()

project = os.getenv('GOOGLE_CLOUD_PROJECT')
location = os.getenv('LOCATION')
processor = os.getenv('PROCESSOR_ID')
bucket = 'your-bucket'

# Upload sample
storage_client = storage.Client()
bucket_obj = storage_client.bucket(bucket)
blob = bucket_obj.blob('input/sample.pdf')
blob.upload_from_filename('sample.pdf')

# Batch
client = documentai.DocumentProcessorServiceClient()
parent = f"projects/{project}/locations/{location}/processors/{processor}"
req = documentai.BatchProcessRequest(
    parent=parent,
    input_config=documentai.BatchProcessRequest.InputConfig(gcs_source=documentai.GcsSource(uri=f'gs://{bucket}/input/')),
    output_config=documentai.BatchProcessRequest.OutputConfig(gcs_destination=documentai.GcsDestination(uri=f'gs://{bucket}/output/'))
)
operation = client.batch_process_documents(request=req)

while not operation.done:
    time.sleep(30)
    operation = client.get_operation(operation.name)

print('Pipeline complet terminé !')

Standalone script: upload → batch → wait. Run with python full_pipeline.py to test the full flow in one shot. Perfect for CI/CD pipelines.

Best Practices

  • Use GDPR-compliant regions (eu) for sensitive data.
  • Batch for scale: Sync only for <5 pages, batch for production.
  • Monitor quotas (500 ops/min) in console Metrics.
  • Secure processor ID in Secret Manager, never hardcode.
  • Post-process: Validate confidence >0.8 before DB inserts.

Common Errors to Avoid

  • Wrong MIME: Must be 'application/pdf' exactly, not 'pdf' → 400 Bad Request.
  • No auth: Forget GOOGLE_APPLICATION_CREDENTIALS → 403 Forbidden.
  • Aggressive polling: Sleep <30s throttles operations.
  • Missing buckets: Run gsutil mb gs://your-bucket first.

Next Steps