Event-Driven архитектура: полное руководство по событийно-ориентированному дизайну
Event-Driven архитектура (EDA) — это архитектурный паттерн, в котором компоненты системы взаимодействуют через производство и потребление событий. Событие — это запись о факте, произошедшем в системе, который имеет значение для бизнеса.
В этой статье мы разберём Event Sourcing, CQRS, шины событий, паттерны обработки, масштабирование и лучшие практики Event-Driven архитектуры для продакшена.
Что такое событие
Событие (Event) — это неизменяемая запись о факте, произошедшем в прошлом.
# Примеры событий
OrderCreated # Заказ был создан
PaymentProcessed # Платёж был обработан
UserVerified # Пользователь был верифицирован
InventoryReserved # Товар был зарезервирован
ShipmentDelivered # Доставка была выполнена
Характеристики хорошего события
Прошедшее время — событие описывает факт, который уже произошёл.
# ✅ Хорошо
OrderCreated
OrderShipped
PaymentFailed
# ❌ Плохо
CreateOrder # Это команда, а не событие
ShipOrder # Это команда
FailPayment # Это команда
Неизменяемость — событие нельзя изменить после создания.
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True) # frozen делает объект неизменяемым
class OrderCreated:
order_id: str
customer_id: str
total: float
timestamp: datetime
items: list
Самодостаточность — событие содержит все данные, необходимые для понимания факта.
# ✅ Хорошо: все данные в событии
@dataclass
class OrderCreated:
order_id: str
customer_id: str
customer_name: str # Денормализация
customer_email: str # Денормализация
total: float
items: list
# ❌ Плохо: нужно идти в БД за данными
@dataclass
class OrderCreated:
order_id: str
# Где данные о заказе?
Архитектурные стили EDA
Notification (Уведомление)
Простейшая форма: события содержат минимум данных, только уведомление об изменении.
Order Service → OrderCreated(order_id) → Event Bus
↓
Inventory Service → забирает order_id → запрашивает Order Service
↓
Email Service → забирает order_id → запрашивает Order Service
Преимущества:
- Простота реализации
- Маленький размер событий
Недостатки:
- Дополнительный запрос к источнику
- Временная связанность
Event-Carried State Transfer (Перенос состояния)
События содержат все данные, необходимые потребителям.
Order Service → OrderCreated(order_id, customer, items, total) → Event Bus
↓
Inventory Service → обновляет свою копию данных без дополнительных запросов
Преимущества:
- Нет дополнительных запросов
- Слабая связанность
Недостатки:
- Больший размер событий
- Дублирование данных
Event Sourcing (Журналирование событий)
Состояние системы определяется как последовательность событий.
Состояние = Событие₁ + Событие₂ + ... + Событиеₙ
# Вместо хранения текущего состояния:
{
"order_id": "123",
"status": "shipped",
"total": 99.99
}
# Храним последовательность событий:
[
{"type": "OrderCreated", "data": {...}},
{"type": "PaymentProcessed", "data": {...}},
{"type": "OrderShipped", "data": {...}}
]
Event Sourcing подробно
Агрегат с событийным хранением
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
from uuid import UUID, uuid4
@dataclass
class OrderCreated:
order_id: UUID
customer_id: UUID
items: list
total: float
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class PaymentProcessed:
order_id: UUID
payment_id: UUID
amount: float
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class OrderShipped:
order_id: UUID
tracking_number: str
timestamp: datetime = field(default_factory=datetime.utcnow)
class Order:
def __init__(self, order_id: UUID = None):
self.order_id = order_id or uuid4()
self.customer_id: UUID = None
self.items: list = []
self.total: float = 0
self.status: str = None
self.payment_id: UUID = None
self.tracking_number: str = None
# Неприменённые события
self._pending_events: List = []
# История для восстановления
self._version: int = 0
def create(self, customer_id: UUID, items: list):
"""Создаёт новый заказ"""
if self.status is not None:
raise DomainError("Order already created")
total = sum(item['price'] * item['quantity'] for item in items)
self._apply_event(OrderCreated(
order_id=self.order_id,
customer_id=customer_id,
items=items,
total=total
))
def process_payment(self, payment_id: UUID, amount: float):
"""Обрабатывает платёж"""
if self.status != 'created':
raise DomainError("Can only pay for created order")
if amount != self.total:
raise DomainError("Payment amount must match order total")
self._apply_event(PaymentProcessed(
order_id=self.order_id,
payment_id=payment_id,
amount=amount
))
def ship(self, tracking_number: str):
"""Отправляет заказ"""
if self.status != 'paid':
raise DomainError("Can only ship paid order")
self._apply_event(OrderShipped(
order_id=self.order_id,
tracking_number=tracking_number
))
def _apply_event(self, event):
"""Применяет событие к состоянию"""
if isinstance(event, OrderCreated):
self.customer_id = event.customer_id
self.items = event.items
self.total = event.total
self.status = 'created'
elif isinstance(event, PaymentProcessed):
self.payment_id = event.payment_id
self.status = 'paid'
elif isinstance(event, OrderShipped):
self.tracking_number = event.tracking_number
self.status = 'shipped'
self._pending_events.append(event)
self._version += 1
def get_pending_events(self) -> List:
"""Возвращает неприменённые события для сохранения"""
events = self._pending_events.copy()
self._pending_events.clear()
return events
@classmethod
def load_from_history(cls, order_id: UUID, events: List) -> 'Order':
"""Восстанавливает агрегат из истории событий"""
order = cls(order_id)
for event in events:
order._apply_event(event)
order._pending_events.clear()
return order
Event Store (Хранилище событий)
import json
from datetime import datetime
from typing import List
class EventStore:
def __init__(self, db):
self.db = db
def save(self, aggregate_id: str, events: List, expected_version: int):
"""Сохраняет события с оптимистичной блокировкой"""
for event in events:
self.db.execute("""
INSERT INTO events (
aggregate_id,
aggregate_type,
event_type,
event_data,
version,
timestamp
) VALUES (%s, %s, %s, %s, %s, %s)
""", (
str(aggregate_id),
type(event).__module__,
type(event).__name__,
json.dumps(self._serialize_event(event)),
expected_version + 1,
datetime.utcnow()
))
expected_version += 1
def get_history(self, aggregate_id: str) -> List:
"""Получает всю историю событий агрегата"""
rows = self.db.fetch("""
SELECT event_type, event_data
FROM events
WHERE aggregate_id = %s
ORDER BY version ASC
""", str(aggregate_id))
return [self._deserialize_event(row) for row in rows]
def _serialize_event(self, event):
"""Сериализует событие для хранения"""
return {
'order_id': str(event.order_id),
**{k: v for k, v in event.__dict__.items()
if k != 'order_id' and not k.startswith('_')}
}
def _deserialize_event(self, row):
"""Десериализует событие из хранения"""
event_type = row['event_type']
event_data = json.loads(row['event_data'])
# Динамическое создание объекта события
module_name, class_name = event_type.rsplit('.', 1)
module = __import__(module_name, fromlist=[class_name])
event_class = getattr(module, class_name)
return event_class(**event_data)
Snapshot (Снимки состояния)
Для агрегатов с длинной историей воспроизведение всех событий может быть медленным.
class SnapshotStore:
def __init__(self, db):
self.db = db
def save(self, aggregate_id: str, version: int, state: dict):
"""Сохраняет снимок состояния"""
self.db.execute("""
INSERT INTO snapshots (aggregate_id, version, state, created_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (aggregate_id)
DO UPDATE SET version = %s, state = %s, created_at = %s
WHERE EXCLUDED.version > snapshots.version
""", (
str(aggregate_id), version, json.dumps(state),
datetime.utcnow(), version, json.dumps(state), datetime.utcnow()
))
def get(self, aggregate_id: str):
"""Получает последний снимок"""
row = self.db.fetchrow("""
SELECT version, state
FROM snapshots
WHERE aggregate_id = %s
ORDER BY version DESC
LIMIT 1
""", str(aggregate_id))
if row:
return row['version'], json.loads(row['state'])
return None, None
# Загрузка агрегата со снимком
def load_order_with_snapshot(order_id: UUID, event_store, snapshot_store):
# Пробуем получить снимок
snapshot_version, snapshot_state = snapshot_store.get(order_id)
if snapshot_state:
# Восстанавливаем из снимка
order = Order.load_from_state(order_id, snapshot_state)
order._version = snapshot_version
# Применяем только события после снимка
events = event_store.get_history_after(order_id, snapshot_version)
for event in events:
order._apply_event(event)
else:
# Загружаем всю историю
events = event_store.get_history(order_id)
order = Order.load_from_history(order_id, events)
return order
CQRS (Command Query Responsibility Segregation)
CQRS — это паттерн разделения операций чтения и записи на разные модели.
Write Model (Command Side)
from dataclasses import dataclass
from typing import Optional
@dataclass
class CreateOrderCommand:
customer_id: str
items: list
shipping_address: str
@dataclass
class ProcessPaymentCommand:
order_id: str
payment_method: str
amount: float
class CreateOrderHandler:
def __init__(self, event_store, event_bus):
self.event_store = event_store
self.event_bus = event_bus
async def handle(self, command: CreateOrderCommand) -> str:
# Создание агрегата
order = Order()
order.create(
customer_id=command.customer_id,
items=command.items
)
# Сохранение событий
events = order.get_pending_events()
await self.event_store.save(order.order_id, events, 0)
# Публикация событий
for event in events:
await self.event_bus.publish(event)
return str(order.order_id)
class ProcessPaymentHandler:
def __init__(self, event_store, event_bus):
self.event_store = event_store
self.event_bus = event_bus
async def handle(self, command: ProcessPaymentCommand):
# Загрузка агрегата
events = await self.event_store.get_history(command.order_id)
order = Order.load_from_history(command.order_id, events)
# Применение команды
order.process_payment(
payment_id=generate_payment_id(),
amount=command.amount
)
# Сохранение событий
events = order.get_pending_events()
await self.event_store.save(
order.order_id,
events,
order._version - len(events)
)
# Публикация
for event in events:
await self.event_bus.publish(event)
Read Model (Query Side)
class OrderReadModel:
def __init__(self, db):
self.db = db
self._setup_tables()
def _setup_tables(self):
"""Создаёт таблицы для чтения"""
self.db.execute("""
CREATE TABLE IF NOT EXISTS orders_read (
order_id UUID PRIMARY KEY,
customer_id UUID,
status VARCHAR(50),
total NUMERIC(10, 2),
created_at TIMESTAMP,
paid_at TIMESTAMP,
shipped_at TIMESTAMP
)
""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS order_items_read (
id SERIAL PRIMARY KEY,
order_id UUID REFERENCES orders_read(order_id),
product_id UUID,
quantity INTEGER,
price NUMERIC(10, 2)
)
""")
async def handle_event(self, event):
"""Обрабатывает события для обновления read model"""
if isinstance(event, OrderCreated):
await self.db.execute("""
INSERT INTO orders_read
(order_id, customer_id, status, total, created_at)
VALUES (%s, %s, 'created', %s, %s)
""", (event.order_id, event.customer_id, event.total, event.timestamp))
for item in event.items:
await self.db.execute("""
INSERT INTO order_items_read
(order_id, product_id, quantity, price)
VALUES (%s, %s, %s, %s)
""", (event.order_id, item['product_id'], item['quantity'], item['price']))
elif isinstance(event, PaymentProcessed):
await self.db.execute("""
UPDATE orders_read
SET status = 'paid', paid_at = %s
WHERE order_id = %s
""", (event.timestamp, event.order_id))
elif isinstance(event, OrderShipped):
await self.db.execute("""
UPDATE orders_read
SET status = 'shipped', shipped_at = %s
WHERE order_id = %s
""", (event.timestamp, event.order_id))
async def get_order(self, order_id: str):
"""Получает заказ с товарами"""
order = await self.db.fetchrow("""
SELECT * FROM orders_read WHERE order_id = %s
""", order_id)
items = await self.db.fetch("""
SELECT * FROM order_items_read WHERE order_id = %s
""", order_id)
return {
'order': dict(order),
'items': [dict(item) for item in items]
}
async def get_customer_orders(self, customer_id: str):
"""Получает все заказы клиента"""
orders = await self.db.fetch("""
SELECT * FROM orders_read
WHERE customer_id = %s
ORDER BY created_at DESC
""", customer_id)
return [dict(order) for order in orders]
Проектные модели (Projections)
class Projection:
def __init__(self, name: str, db):
self.name = name
self.db = db
self._position = 0
async def get_position(self) -> int:
"""Получает позицию последнего обработанного события"""
row = await self.db.fetchrow("""
SELECT position FROM projection_positions
WHERE projection_name = %s
""", self.name)
return row['position'] if row else 0
async def update_position(self, position: int):
"""Обновляет позицию"""
await self.db.execute("""
INSERT INTO projection_positions (projection_name, position)
VALUES (%s, %s)
ON CONFLICT (projection_name)
DO UPDATE SET position = %s
""", (self.name, position, position))
async def process_events(self, events: List):
"""Обрабатывает пакет событий"""
for event in events:
await self.apply_event(event)
await self.update_position(events[-1].position)
class OrderSummaryProjection(Projection):
def __init__(self, db):
super().__init__('order_summary', db)
async def apply_event(self, event):
if isinstance(event, OrderCreated):
await self.db.execute("""
INSERT INTO order_summary
(date, orders_count, total_amount)
VALUES (%s, 1, %s)
ON CONFLICT (date)
DO UPDATE SET
orders_count = order_summary.orders_count + 1,
total_amount = order_summary.total_amount + %s
""", (event.timestamp.date(), event.total, event.total))
class CustomerStatsProjection(Projection):
def __init__(self, db):
super().__init__('customer_stats', db)
async def apply_event(self, event):
if isinstance(event, OrderCreated):
await self.db.execute("""
INSERT INTO customer_stats (customer_id, orders_count, total_spent)
VALUES (%s, 1, %s)
ON CONFLICT (customer_id)
DO UPDATE SET
orders_count = customer_stats.orders_count + 1,
total_spent = customer_stats.total_spent + %s
""", (event.customer_id, event.total, event.total))
Шины событий (Event Bus)
In-Memory Event Bus
import asyncio
from typing import List, Callable
from collections import defaultdict
class InMemoryEventBus:
def __init__(self):
self._subscribers: dict = defaultdict(list)
self._lock = asyncio.Lock()
def subscribe(self, event_type: type, handler: Callable):
"""Подписывает обработчик на тип события"""
self._subscribers[event_type].append(handler)
async def publish(self, event):
"""Публикует событие всем подписчикам"""
handlers = self._subscribers.get(type(event), [])
# Параллельное выполнение обработчиков
await asyncio.gather(
*[handler(event) for handler in handlers],
return_exceptions=True
)
async def publish_sequential(self, event):
"""Последовательная публикация (для транзакций)"""
handlers = self._subscribers.get(type(event), [])
for handler in handlers:
await handler(event)
# Использование
event_bus = InMemoryEventBus()
# Подписка
event_bus.subscribe(OrderCreated, send_confirmation_email)
event_bus.subscribe(OrderCreated, reserve_inventory)
event_bus.subscribe(PaymentProcessed, update_accounting)
# Публикация
await event_bus.publish(OrderCreated(
order_id=uuid4(),
customer_id=customer_id,
items=items,
total=total
))
Redis Event Bus
import json
import redis.asyncio as redis
class RedisEventBus:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def publish(self, event):
"""Публикует событие в Redis Pub/Sub"""
event_data = {
'type': type(event).__name__,
'data': self._serialize(event)
}
await self.redis.publish(
f'events:{type(event).__name__}',
json.dumps(event_data)
)
async def subscribe(self, event_types: List[type], handler):
"""Подписывается на события"""
pubsub = self.redis.pubsub()
for event_type in event_types:
await pubsub.subscribe(f'events:{event_type.__name__}')
async for message in pubsub.listen():
if message['type'] == 'message':
event_data = json.loads(message['data'])
event = self._deserialize(event_data)
await handler(event)
def _serialize(self, event):
return {k: v for k, v in event.__dict__.items() if not k.startswith('_')}
def _deserialize(self, data):
# Динамическое создание объекта
pass
# Использование в микросервисах
redis_client = redis.Redis(host='localhost', port=6379)
event_bus = RedisEventBus(redis_client)
# Сервис 1: публикация
await event_bus.publish(OrderCreated(order_id=uuid4(), ...))
# Сервис 2: подписка
async def handle_order_created(event):
print(f"New order: {event.order_id}")
await event_bus.subscribe([OrderCreated], handle_order_created)
Kafka Event Bus
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class KafkaEventBus:
def __init__(self, bootstrap_servers: List[str]):
self.producer = AIOKafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v.__dict__).encode(),
key_serializer=lambda k: k.encode() if k else None
)
self.consumer = None
async def start(self):
await self.producer.start()
async def stop(self):
await self.producer.stop()
if self.consumer:
await self.consumer.stop()
async def publish(self, event, partition_key: str = None):
"""Публикует событие в Kafka"""
topic = f'events.{type(event).__name__}'
await self.producer.send_and_wait(
topic=topic,
value=event,
key=partition_key
)
async def subscribe(self, topics: List[str], group_id: str, handler):
"""Подписывается на топики Kafka"""
self.consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers='localhost:9092',
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode()),
auto_offset_reset='earliest'
)
await self.consumer.start()
try:
async for msg in self.consumer:
event_data = msg.value
event = self._deserialize(event_data)
await handler(event)
await self.consumer.commit()
finally:
await self.consumer.stop()
def _deserialize(self, data):
# Восстановление объекта события
pass
# Использование
event_bus = KafkaEventBus(['localhost:9092'])
await event_bus.start()
# Публикация
await event_bus.publish(
OrderCreated(order_id='123', customer_id='456', ...),
partition_key='customer-456' # Для порядка событий клиента
)
# Подписка
async def process_order_event(event):
await handle_order(event)
await event_bus.subscribe(
['events.OrderCreated', 'events.OrderShipped'],
group_id='order-processor',
handler=process_order_event
)
Паттерны обработки событий
Event Filtering (Фильтрация)
class EventFilter:
def __init__(self, condition):
self.condition = condition
def matches(self, event) -> bool:
return self.condition(event)
# Фильтры
high_value_filter = EventFilter(
lambda e: isinstance(e, OrderCreated) and e.total > 1000
)
vip_customer_filter = EventFilter(
lambda e: isinstance(e, OrderCreated) and e.customer_id in VIP_CUSTOMERS
)
# Обработчик с фильтрами
class FilteredHandler:
def __init__(self):
self.filters = []
def add_filter(self, event_type: type, filter: EventFilter, handler):
self.filters.append((event_type, filter, handler))
async def handle(self, event):
for event_type, filter, handler in self.filters:
if isinstance(event, event_type) and filter.matches(event):
await handler(event)
Event Aggregation (Агрегация событий)
import asyncio
from collections import deque
class EventAggregator:
def __init__(self, window_seconds: int = 60):
self.window_seconds = window_seconds
self.events = deque()
self.lock = asyncio.Lock()
async def add_event(self, event):
async with self.lock:
self.events.append((event, asyncio.get_event_loop().time()))
self._cleanup_old_events()
async def get_aggregated(self) -> dict:
async with self.lock:
self._cleanup_old_events()
return {
'count': len(self.events),
'total': sum(e.total for e, _ in self.events if hasattr(e, 'total')),
'events': [e for e, _ in self.events]
}
def _cleanup_old_events(self):
now = asyncio.get_event_loop().time()
while self.events and now - self.events[0][1] > self.window_seconds:
self.events.popleft()
# Использование
aggregator = EventAggregator(window_seconds=60)
# Добавление событий
await aggregator.add_event(OrderCreated(...))
await aggregator.add_event(OrderCreated(...))
# Получение агрегированных данных
summary = await aggregator.get_aggregated()
print(f"Orders in last minute: {summary['count']}, Total: {summary['total']}")
Event Chaining (Цепочки событий)
class EventChain:
def __init__(self):
self.chain = []
def when(self, event_type: type):
"""Условие: когда произошло событие"""
def decorator(handler):
self.chain.append(('when', event_type, handler))
return self
return decorator
def then_publish(self, event_factory):
"""Действие: опубликовать новое событие"""
self.chain.append(('then_publish', event_factory))
return self
async def process(self, event, event_bus):
"""Обрабатывает цепочку"""
for step_type, *args in self.chain:
if step_type == 'when':
event_type, handler = args
if isinstance(event, event_type):
await handler(event)
elif step_type == 'then_publish':
event_factory = args[0]
new_event = event_factory(event)
await event_bus.publish(new_event)
# Пример цепочки
chain = EventChain()
chain.when(OrderCreated).then_publish(
lambda e: InventoryReserved(
order_id=e.order_id,
items=e.items
)
).when(InventoryReserved).then_publish(
lambda e: PaymentRequested(
order_id=e.order_id
)
)
# Обработка
await chain.process(event, event_bus)
Масштабирование
Partitioning по агрегатам
class PartitionedEventProcessor:
def __init__(self, num_partitions: int = 10):
self.num_partitions = num_partitions
self.processors = {}
def get_partition(self, aggregate_id: str) -> int:
"""Определяет партицию по ID агрегата"""
return hash(aggregate_id) % self.num_partitions
async def process_event(self, event):
"""Обрабатывает событие в правильной партиции"""
partition = self.get_partition_key(event)
if partition not in self.processors:
self.processors[partition] = EventProcessor(partition)
await self.processors[partition].process(event)
def get_partition_key(self, event) -> str:
"""Получает ключ партиционирования из события"""
if hasattr(event, 'order_id'):
return event.order_id
if hasattr(event, 'customer_id'):
return event.customer_id
return 'default'
Consumer Groups
# Kafka consumer groups для параллельной обработки
from aiokafka import AIOKafkaConsumer
async def create_consumer_group(group_id: str, topics: List[str]):
consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers='localhost:9092',
group_id=group_id,
# Автоматическое распределение партиций между участниками группы
partition_assignment_strategy='roundrobin'
)
await consumer.start()
return consumer
# Запуск нескольких инстансов одного процессора
# Kafka автоматически распределит партиции между ними
Best Practices
Идемпотентность обработчиков
class IdempotentHandler:
def __init__(self, db):
self.db = db
async def handle(self, event):
event_id = self._get_event_id(event)
# Проверка: уже обрабатывали?
exists = await self.db.fetchval("""
SELECT 1 FROM processed_events WHERE event_id = %s
""", event_id)
if exists:
return # Уже обработано
# Обработка
await self.process_event(event)
# Запись: обработано
await self.db.execute("""
INSERT INTO processed_events (event_id, processed_at)
VALUES (%s, %s)
""", (event_id, datetime.utcnow()))
def _get_event_id(self, event) -> str:
"""Уникальный ID события для дедупликации"""
return f"{type(event).__name__}:{event.order_id}:{event.timestamp}"
Порядок событий
class OrderedEventProcessor:
def __init__(self):
self.expected_versions = {} # aggregate_id -> expected_version
async def process(self, event):
aggregate_id = event.aggregate_id
current_version = event.version
expected = self.expected_versions.get(aggregate_id, 0)
if current_version != expected + 1:
# Нарушен порядок
await self.handle_out_of_order(event)
return
await self.process_event(event)
self.expected_versions[aggregate_id] = current_version
async def handle_out_of_order(self, event):
"""Обработка событий вне порядка"""
# Буферизация до получения пропущенных событий
# Или запрос пропущенных событий из Event Store
pass
Мониторинг
from prometheus_client import Counter, Histogram
# Метрики
events_processed = Counter(
'events_processed_total',
'Total events processed',
['event_type', 'status']
)
event_processing_time = Histogram(
'event_processing_seconds',
'Event processing time',
['event_type']
)
events_lag = Gauge(
'events_lag',
'Events processing lag',
['consumer_group']
)
# Использование
@event_processing_time.labels(event_type='OrderCreated').time()
async def process_event(event):
try:
await handle(event)
events_processed.labels(event_type='OrderCreated', status='success').inc()
except Exception:
events_processed.labels(event_type='OrderCreated', status='error').inc()
raise
Заключение
Event-Driven архитектура — это мощный подход для построения распределённых систем:
- Слабая связанность — сервисы не зависят друг от друга
- Масштабируемость — независимая обработка событий
- Отказоустойчивость — события сохраняются и могут быть обработаны позже
- Аудит — полная история изменений через события
- Гибкость — легко добавлять новых потребителей событий
Используйте EDA, когда:
- Сложная доменная логика с множеством бизнес-процессов
- Нужна полная история изменений
- Требуется слабая связанность между сервисами
- Высокие требования к масштабируемости
Избегайте EDA, когда:
- Простое CRUD приложение
- Нужна строгая согласованность в реальном времени
- Нет ресурсов на поддержку инфраструктуры
- Команда не готова к eventual consistency