En muchos equipos técnicos, existen tareas que se repiten cada semana o cada mes: generación de reportes, validación de datos, procesamiento de facturas, envíos de correos automáticos… Son necesarias, pero consumen tiempo y están expuestas a errores humanos. En este post, te muestro cómo puedes automatizar cualquier flujo recurrente usando una arquitectura flexible basada en Python, Flask, SQLAlchemy y Jinja2.
Spoiler: con solo definir una receta .yml y unas plantillas .j2, podrás lanzar un flujo desde una API y dejar que un demonio lo orqueste paso a paso.
Estructura general del sistema
La arquitectura base del proyecto tiene esta estructura:
automatizacion-flujos/ ├── app/ │ ├── main.py │ ├── api.py # API REST con Flask │ ├── demonio.py # Ejecuta pasos según sus dependencias │ ├── flujos/ │ │ ├── reportes/ │ │ │ ├── recipe.yml │ │ │ └── templates/ │ │ │ ├── validacion_datos.j2 │ │ │ ├── procesamiento_datos.j2 │ │ │ └── generacion_informe.j2 │ ├── models.py │ ├── utils.py ├── requirements.txt
Cada carpeta dentro de /flujos, representa un tipo de flujo. Al lanzar uno desde la API, se crean registros en base de datos para cada paso, y un demonio se encarga de ejecutarlos en orden, según sus dependencias.
Librerías necesarias
El archivo requirements.txt contiene las librerías necesarias para la ejecución del proyecto:
Flask==3.0.3 Flask-RESTX==1.3.0 Flask-SQLAlchemy==3.1.1 Jinja2==3.1.2 PyYAML==6.0.2
Para instalar las dependencias ejecuta el comando pip install -r requirements.txt (Se requiere tener instalado pip). Las versiones de estas dependencias son compatibles con la versión de python 3.13.
Define tu flujo con YAML
Receta con las características del flujoapp/flujos/reportes/recipe.yml -
pasos:
- nombre: validacion_datos
plantilla: validacion_datos.j2
- nombre: procesamiento_datos
plantilla: procesamiento_datos.j2
depende_de: validacion_datos
- nombre: generacion_informe
plantilla: generacion_informe.j2
depende_de: procesamiento_datosCada paso puede depender de otro. Esto permite construir flujos secuenciales y condicionales fácilmente.
app/flujos/templates/validacion_datos.j2 – Primer paso del flujo
# Simulación de validación
fecha: {{ cuerpo_global.fecha }}
cantidad: {{ cuerpo_global.cantidad_paginas }}
✅ Datos validados para el informe del {{ cuerpo_global.fecha }}app/flujos/templates/procesamiento_datos.j2 – Segundo paso del flujo
# Simulación de procesamiento
📊 Procesando datos del informe para la fecha {{ cuerpo_global.fecha }}...
Procesamiento completado correctamente.app/flujos/templates/generacion_informe.j2 – Tercer paso del flujo
# Simulación de generación del informe
📝 Informe generado
Fecha: {{ cuerpo_global.fecha }}
Páginas: {{ cuerpo_global.paginas }}
Resumen: {{ cuerpo_global.resumen }}Desarrollando la solución paso a paso
Ahora que tienes una receta YAML definida, vamos a crear la lógica necesaria para almacenar los pasos en base de datos:
app/models.py – Modelo de la base de datos
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
db = SQLAlchemy()
class PasoFlujo(db.Model):
id = db.Column(db.Integer, primary_key=True)
id_flujo = db.Column(db.String(64), nullable=False)
tipo_flujo = db.Column(db.String(64), nullable=False)
nombre_paso = db.Column(db.String(64), nullable=False)
plantilla = db.Column(db.String(128), nullable=False)
cuerpo = db.Column(db.JSON, nullable=False)
depende_de = db.Column(db.String(64), nullable=True)
estado = db.Column(db.String(32), default='pendiente')
fecha_alta = db.Column(db.DateTime, default=datetime.utcnow)
fecha_estado = db.Column(db.DateTime, default=datetime.utcnow)app/utils.py – Cargar receta y crear pasos
import yaml
from pathlib import Path
from models import PasoFlujo, db
from datetime import datetime
def cargar_recipe_y_crear_pasos(carpeta_flujo, cuerpo_global, id_flujo):
ruta_path = Path(__file__).resolve().parent / "flujos" / carpeta_flujo / "recipe.yml"
try:
with open(ruta_path, 'r', encoding='utf-8') as f:
receta = yaml.safe_load(f)
except FileNotFoundError as e:
raise FileNotFoundError(f"No se encontró el archivo de recipe en: {ruta_path}") from e
pasos_creados = []
for paso in receta['pasos']:
nuevo_paso = PasoFlujo(
id_flujo=id_flujo,
tipo_flujo=carpeta_flujo,
nombre_paso=paso['nombre'],
plantilla=paso['plantilla'],
cuerpo=cuerpo_global,
depende_de=paso.get('depende_de'),
estado='pendiente',
fecha_alta=datetime.now(),
fecha_estado=datetime.now()
)
db.session.add(nuevo_paso)
pasos_creados.append(nuevo_paso)
db.session.commit()
return pasos_creadosA continuación vamos a exponer la API REST para poder ejecutar los pasos correspondientes.
app/api.py – API REST para lanzar y consultar flujos
import uuid
from flask import request
from flask_restx import Resource, Namespace, fields
from models import PasoFlujo
from utils import cargar_recipe_y_crear_pasos
api_bp = Namespace('flujos', description='Gestión de flujos automatizados')
# Definimos el modelo para la documentación Swagger
modelo_flujo = api_bp.model('LanzarFlujo', {
'carpeta_flujo': fields.String(required=True, description='Nombre de la carpeta del flujo (ej. "reportes")'),
'cuerpo_global': fields.Raw(required=True, description='Diccionario con los parámetros que se pasarán a las plantillas')
})
modelo_paso = api_bp.model('Paso', {
'nombre_paso': fields.String,
'estado': fields.String,
'depende_de': fields.String,
'fecha_estado': fields.String
})
modelo_flujo_detalle = api_bp.model('FlujoDetalle', {
'id_flujo': fields.String,
'pasos': fields.List(fields.Nested(modelo_paso))
})
@api_bp.route('/')
class LanzarFlujo(Resource):
@api_bp.expect(modelo_flujo)
@api_bp.response(201, 'Flujo creado')
def post(self):
data = request.get_json()
carpeta_flujo = data.get("carpeta_flujo")
cuerpo_global = data.get("cuerpo_global")
id_flujo = str(uuid.uuid4())
pasos_creados = cargar_recipe_y_crear_pasos(
carpeta_flujo=carpeta_flujo,
cuerpo_global=cuerpo_global,
id_flujo=id_flujo
)
return {
"id_flujo": id_flujo,
"pasos_creados": [p.nombre_paso for p in pasos_creados]
}, 201
@api_bp.route('/<string:id_flujo>')
class ObtenerFlujo(Resource):
@api_bp.marshal_with(modelo_flujo_detalle)
def get(self, id_flujo):
pasos = PasoFlujo.query.filter_by(id_flujo=id_flujo).order_by(PasoFlujo.fecha_alta).all()
return {
'id_flujo': id_flujo,
'pasos': pasos
}
@api_bp.route('')
class ListarFlujos(Resource):
def get(self):
flujos = PasoFlujo.query.all()
agrupado = {}
for paso in flujos:
flujo = agrupado.setdefault(paso.id_flujo, [])
flujo.append({
"nombre_paso": paso.nombre_paso,
"estado": paso.estado,
"depende_de": paso.depende_de,
"fecha_estado": paso.fecha_estado.isoformat() if paso.fecha_estado else None
})
return agrupado, 200Por último, crea el demonio que orquesta la ejecución y las dependencias.
app/demonio.py – Orquestador de flujos
import time
from pathlib import Path
from models import db, PasoFlujo
from main import create_app
from datetime import datetime
from jinja2 import Template
# Inicializar la app para tener contexto
app = create_app()
def renderizar_template(template_path, contexto):
"""Renderiza una plantilla Jinja2 con el contexto dado."""
with open(template_path, "r") as file:
content = file.read()
template = Template(content)
return template.render(**contexto)
def ejecutar_paso(paso: PasoFlujo):
print(f"🟡 Ejecutando paso: {paso.nombre_paso} del flujo {paso.id_flujo}")
ruta_template = Path(__file__).resolve().parent / "flujos" / paso.tipo_flujo / "templates" / paso.plantilla
try:
contenido_resuelto = renderizar_template(ruta_template, {"cuerpo_global": paso.cuerpo})
# ✅ Guardamos el contenido resuelto directamente en el paso
paso.cuerpo["contenido_resuelto"] = contenido_resuelto
paso.estado = "finalizado"
paso.fecha_estado = datetime.now()
db.session.commit()
print(f"▶️ Contenido resuelto del paso:\n{contenido_resuelto}")
print(f"✅ Paso {paso.nombre_paso} finalizado.\n")
except Exception as e:
paso.estado = "error"
paso.fecha_estado = datetime.now()
paso.cuerpo["error"] = str(e)
db.session.commit()
print(f"❌ Error al ejecutar paso {paso.nombre_paso}: {e}")
def dependencia_finalizada(paso: PasoFlujo):
if not paso.depende_de:
return True
paso_previo = PasoFlujo.query.filter_by(
id_flujo=paso.id_flujo,
nombre_paso=paso.depende_de
).first()
return paso_previo and paso_previo.estado == "finalizado"
def ejecutar_demonio():
print("🔁 Iniciando demonio de ejecución de flujos...")
while True:
with app.app_context():
pasos_pendientes = PasoFlujo.query.filter_by(estado="pendiente").all()
for paso in pasos_pendientes:
if dependencia_finalizada(paso):
paso.estado = "en_curso"
paso.fecha_estado = datetime.now()
db.session.commit()
ejecutar_paso(paso)
# Esperar 10 segundos antes de revisar de nuevo
time.sleep(10)
if __name__ == "__main__":
ejecutar_demonio()app/main.py – Punto de ejecución de la aplicación
from flask import Flask
from flask_restx import Api
from models import db
from api import api_bp
def create_app():
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flujos.db"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db.init_app(app)
api = Api(app, title="API de Automatización de Flujos", version="1.0", doc="/swagger")
api.add_namespace(api_bp, path="/api/flujos")
with app.app_context():
db.create_all()
return app
if __name__ == "__main__":
app = create_app()
app.run(debug=True)Lanza tu flujo desde la API
Una vez implementado todo el backend, puedes lanzar flujos desde la API REST.
Ejecuta la aplicación y el demonio en dos consolas diferentes para que puedas ver las ejecuciones de cada uno por separado:
python app/demonio.py
El demonio se ejecuta cada diez segundos de forma recurrente a la espera de encontrar registros en estado pendiente.
python app/main.py
En el navegador introduce la siguiente url para acceder al Swagger de la aplicación que acabas de crear.
Ejecutamos el endpoint POST /api/flujos para crear el flujo con el JSON de ejemplo:
{
"carpeta_flujo": "reportes",
"cuerpo_global": {
"fecha": "2025-01-01",
"cantidad_paginas": "12",
"resumen": "Resumen semanal"
}
}
Esta es la respuesta esperada por parte de la API:
Esto crea tres registros en la base de datos, uno por paso. Cada registro incluye:
- UUID.
- Tipo de flujo.
- El contenido con la parametrización de la plantilla correspondiente.
- Nombre del paso.
- Paso del que depende (si aplica).
- Estado:
pendiente, en_curso, finalizado, error
Con esto ya hemos creado el primer flujo. Y solo nos queda esperar que el demonio recupere los pasos y los ejecute de forma sucesiva:
Visualiza los flujos creados
Puedes consultar un flujo completo desde la API en GET /api/flujos/{uuid_flujo}
Y verás un JSON con todos los pasos encadenados, sus estados y dependencias. También puedes consultar todos los flujos agrupados:
Conclusión y posibles siguientes pasos
Este enfoque te permite construir flujos reutilizables, parametrizados y seguros. Puedes expandirlo con:
- UI para lanzar y monitorear flujos.
- Dashboard con métricas.
- Soporte para tareas asíncronas o por cron.
- Logging y alertas.
- Integración con otros servicios externos.
Si tienes tareas repetitivas que aún haces a mano… es hora de automatizarlas.
¡Sigue aprendiendo sobre automatización y más mundo tech en nuestras redes sociales!