Profile Software Services

Automatiza flujos recurrentes con Python, Flask y Jinja2

En muchos equipos técnicos, existen tareas que se repiten cada semana o cada mes: generación de reportes, validación de datos, procesamiento de facturas, envíos de correos automáticos… Son necesarias, pero consumen tiempo y están expuestas a errores humanos. En este post, te muestro cómo puedes automatizar cualquier flujo recurrente usando una arquitectura flexible basada en Python, Flask, SQLAlchemy y Jinja2.

Spoiler: con solo definir una receta .yml y unas plantillas .j2, podrás lanzar un flujo desde una API y dejar que un demonio lo orqueste paso a paso.

Estructura general del sistema

La arquitectura base del proyecto tiene esta estructura:

automatizacion-flujos/
├── app/
│   ├── main.py
│   ├── api.py          # API REST con Flask
│   ├── demonio.py      # Ejecuta pasos según sus dependencias
│   ├── flujos/
│   │   ├── reportes/
│   │   │   ├── recipe.yml
│   │   │   └── templates/
│   │   │       ├── validacion_datos.j2
│   │   │       ├── procesamiento_datos.j2
│   │   │       └── generacion_informe.j2
│   ├── models.py
│   ├── utils.py
├── requirements.txt

Cada carpeta dentro de /flujos, representa un tipo de flujo. Al lanzar uno desde la API, se crean registros en base de datos para cada paso, y un demonio se encarga de ejecutarlos en orden, según sus dependencias.

Librerías necesarias

El archivo requirements.txt contiene las librerías necesarias para la ejecución del proyecto:

Flask==3.0.3
Flask-RESTX==1.3.0
Flask-SQLAlchemy==3.1.1
Jinja2==3.1.2
PyYAML==6.0.2

Para instalar las dependencias ejecuta el comando pip install -r requirements.txt (Se requiere tener instalado pip). Las versiones de estas dependencias son compatibles con la versión de python 3.13. 

Define tu flujo con YAML

app/flujos/reportes/recipe.yml - Receta con las características del flujo

pasos:
  - nombre: validacion_datos
    plantilla: validacion_datos.j2
  - nombre: procesamiento_datos
    plantilla: procesamiento_datos.j2
    depende_de: validacion_datos
  - nombre: generacion_informe
    plantilla: generacion_informe.j2
    depende_de: procesamiento_datos

Cada paso puede depender de otro. Esto permite construir flujos secuenciales y condicionales fácilmente.

app/flujos/templates/validacion_datos.j2Primer paso del flujo

# Simulación de validación

fecha: {{ cuerpo_global.fecha }}
cantidad: {{ cuerpo_global.cantidad_paginas }}

✅ Datos validados para el informe del {{ cuerpo_global.fecha }}

app/flujos/templates/procesamiento_datos.j2Segundo paso del flujo

# Simulación de procesamiento

📊 Procesando datos del informe para la fecha {{ cuerpo_global.fecha }}...
Procesamiento completado correctamente.

app/flujos/templates/generacion_informe.j2Tercer paso del flujo

# Simulación de generación del informe

📝 Informe generado
Fecha: {{ cuerpo_global.fecha }}
Páginas: {{ cuerpo_global.paginas }}
Resumen: {{ cuerpo_global.resumen }}

Desarrollando la solución paso a paso

Ahora que tienes una receta YAML definida, vamos a crear la lógica necesaria para almacenar los pasos en base de datos:

app/models.py – Modelo de la base de datos

from flask_sqlalchemy import SQLAlchemy
from datetime import datetime

db = SQLAlchemy()

class PasoFlujo(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    id_flujo = db.Column(db.String(64), nullable=False)
    tipo_flujo = db.Column(db.String(64), nullable=False)
    nombre_paso = db.Column(db.String(64), nullable=False)
    plantilla = db.Column(db.String(128), nullable=False)
    cuerpo = db.Column(db.JSON, nullable=False)
    depende_de = db.Column(db.String(64), nullable=True)
    estado = db.Column(db.String(32), default='pendiente')
    fecha_alta = db.Column(db.DateTime, default=datetime.utcnow)
    fecha_estado = db.Column(db.DateTime, default=datetime.utcnow)

app/utils.py – Cargar receta y crear pasos

import yaml
from pathlib import Path
from models import PasoFlujo, db
from datetime import datetime

def cargar_recipe_y_crear_pasos(carpeta_flujo, cuerpo_global, id_flujo):
   ruta_path = Path(__file__).resolve().parent / "flujos" / carpeta_flujo / "recipe.yml"
   try:
       with open(ruta_path, 'r', encoding='utf-8') as f:
           receta = yaml.safe_load(f)
   except FileNotFoundError as e:
       raise FileNotFoundError(f"No se encontró el archivo de recipe en: {ruta_path}") from e

   pasos_creados = []
   for paso in receta['pasos']:
       nuevo_paso = PasoFlujo(
           id_flujo=id_flujo,
           tipo_flujo=carpeta_flujo,
           nombre_paso=paso['nombre'],
           plantilla=paso['plantilla'],
           cuerpo=cuerpo_global,
           depende_de=paso.get('depende_de'),
           estado='pendiente',
           fecha_alta=datetime.now(),
           fecha_estado=datetime.now()
       )
       db.session.add(nuevo_paso)
       pasos_creados.append(nuevo_paso)

   db.session.commit()
   return pasos_creados

A continuación vamos a exponer la API REST para poder ejecutar los pasos correspondientes.

app/api.py – API REST para lanzar y consultar flujos

import uuid

from flask import request
from flask_restx import Resource, Namespace, fields

from models import PasoFlujo
from utils import cargar_recipe_y_crear_pasos

api_bp = Namespace('flujos', description='Gestión de flujos automatizados')

# Definimos el modelo para la documentación Swagger
modelo_flujo = api_bp.model('LanzarFlujo', {
   'carpeta_flujo': fields.String(required=True, description='Nombre de la carpeta del flujo (ej. "reportes")'),
   'cuerpo_global': fields.Raw(required=True, description='Diccionario con los parámetros que se pasarán a las plantillas')
})

modelo_paso = api_bp.model('Paso', {
   'nombre_paso': fields.String,
   'estado': fields.String,
   'depende_de': fields.String,
   'fecha_estado': fields.String
})

modelo_flujo_detalle = api_bp.model('FlujoDetalle', {
   'id_flujo': fields.String,
   'pasos': fields.List(fields.Nested(modelo_paso))
})

@api_bp.route('/')
class LanzarFlujo(Resource):
   @api_bp.expect(modelo_flujo)
   @api_bp.response(201, 'Flujo creado')
   def post(self):
       data = request.get_json()
       carpeta_flujo = data.get("carpeta_flujo")
       cuerpo_global = data.get("cuerpo_global")

       id_flujo = str(uuid.uuid4())
       pasos_creados = cargar_recipe_y_crear_pasos(
           carpeta_flujo=carpeta_flujo,
           cuerpo_global=cuerpo_global,
           id_flujo=id_flujo
       )

       return {
           "id_flujo": id_flujo,
           "pasos_creados": [p.nombre_paso for p in pasos_creados]
       }, 201

@api_bp.route('/<string:id_flujo>')
class ObtenerFlujo(Resource):
   @api_bp.marshal_with(modelo_flujo_detalle)
   def get(self, id_flujo):
       pasos = PasoFlujo.query.filter_by(id_flujo=id_flujo).order_by(PasoFlujo.fecha_alta).all()
       return {
           'id_flujo': id_flujo,
           'pasos': pasos
       }

@api_bp.route('')
class ListarFlujos(Resource):
   def get(self):
       flujos = PasoFlujo.query.all()
       agrupado = {}
       for paso in flujos:
           flujo = agrupado.setdefault(paso.id_flujo, [])
           flujo.append({
               "nombre_paso": paso.nombre_paso,
               "estado": paso.estado,
               "depende_de": paso.depende_de,
               "fecha_estado": paso.fecha_estado.isoformat() if paso.fecha_estado else None
           })
       return agrupado, 200

Por último, crea el demonio que orquesta la ejecución y las dependencias.

app/demonio.py – Orquestador de flujos

import time
from pathlib import Path

from models import db, PasoFlujo
from main import create_app
from datetime import datetime
from jinja2 import Template

# Inicializar la app para tener contexto
app = create_app()

def renderizar_template(template_path, contexto):
  """Renderiza una plantilla Jinja2 con el contexto dado."""
  with open(template_path, "r") as file:
      content = file.read()
  template = Template(content)
  return template.render(**contexto)

def ejecutar_paso(paso: PasoFlujo):
  print(f"🟡 Ejecutando paso: {paso.nombre_paso} del flujo {paso.id_flujo}")

  ruta_template = Path(__file__).resolve().parent / "flujos" / paso.tipo_flujo / "templates" / paso.plantilla
  try:
      contenido_resuelto = renderizar_template(ruta_template, {"cuerpo_global": paso.cuerpo})

      # ✅ Guardamos el contenido resuelto directamente en el paso
      paso.cuerpo["contenido_resuelto"] = contenido_resuelto
      paso.estado = "finalizado"
      paso.fecha_estado = datetime.now()
      db.session.commit()

      print(f"▶️ Contenido resuelto del paso:\n{contenido_resuelto}")
      print(f"✅ Paso {paso.nombre_paso} finalizado.\n")

  except Exception as e:
      paso.estado = "error"
      paso.fecha_estado = datetime.now()
      paso.cuerpo["error"] = str(e)
      db.session.commit()
      print(f"❌ Error al ejecutar paso {paso.nombre_paso}: {e}")

def dependencia_finalizada(paso: PasoFlujo):
  if not paso.depende_de:
      return True
  paso_previo = PasoFlujo.query.filter_by(
      id_flujo=paso.id_flujo,
      nombre_paso=paso.depende_de
  ).first()
  return paso_previo and paso_previo.estado == "finalizado"

def ejecutar_demonio():
  print("🔁 Iniciando demonio de ejecución de flujos...")
  while True:
      with app.app_context():
          pasos_pendientes = PasoFlujo.query.filter_by(estado="pendiente").all()

          for paso in pasos_pendientes:
              if dependencia_finalizada(paso):
                  paso.estado = "en_curso"
                  paso.fecha_estado = datetime.now()
                  db.session.commit()
                  ejecutar_paso(paso)

      # Esperar 10 segundos antes de revisar de nuevo
      time.sleep(10)

if __name__ == "__main__":
  ejecutar_demonio()

app/main.py – Punto de ejecución de la aplicación

from flask import Flask
from flask_restx import Api
from models import db
from api import api_bp

def create_app():
   app = Flask(__name__)
   app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flujos.db"
   app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

   db.init_app(app)

   api = Api(app, title="API de Automatización de Flujos", version="1.0", doc="/swagger")
   api.add_namespace(api_bp, path="/api/flujos")

   with app.app_context():
       db.create_all()

   return app

if __name__ == "__main__":
   app = create_app()
   app.run(debug=True)

Lanza tu flujo desde la API

Una vez implementado todo el backend, puedes lanzar flujos desde la API REST.

Ejecuta la aplicación y el demonio en dos consolas diferentes para que puedas ver las ejecuciones de cada uno por separado:

python app/demonio.py

El demonio se ejecuta cada diez segundos de forma recurrente a la espera de encontrar registros en estado pendiente.

python app/main.py

En el navegador introduce la siguiente url para acceder al Swagger de la aplicación que acabas de crear.

Ejecutamos el endpoint POST /api/flujos para crear el flujo con el JSON de ejemplo:

{
  "carpeta_flujo": "reportes",
  "cuerpo_global": {
    "fecha": "2025-01-01",
    "cantidad_paginas": "12",
    "resumen": "Resumen semanal"
  }
}

Esta es la respuesta esperada por parte de la API:

Esto crea tres registros en la base de datos, uno por paso. Cada registro incluye:

Con esto ya hemos creado el primer flujo. Y solo nos queda esperar que el demonio recupere los pasos y los ejecute de forma sucesiva:

Visualiza los flujos creados

Puedes consultar un flujo completo desde la API en GET /api/flujos/{uuid_flujo}

Y verás un JSON con todos los pasos encadenados, sus estados y dependencias. También puedes consultar todos los flujos agrupados:

Conclusión y posibles siguientes pasos

Este enfoque te permite construir flujos reutilizables, parametrizados y seguros. Puedes expandirlo con:

Si tienes tareas repetitivas que aún haces a mano… es hora de automatizarlas.

¡Sigue aprendiendo sobre automatización y más mundo tech en nuestras redes sociales!

Salir de la versión móvil