11. Событийно-управляемая архитектура: использование событий для интеграции микросервисов (стр. 211-225)
Распределённый комок грязи, или Мыслить существительными
Проблема: наивный подход к микросервисам — деление по существительным:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Заказы │───▶│ Партии │───▶│ Склад │
│ (Orders) │ │ (Batches) │ │ (Warehouse) │
└─────────────┘ └─────────────┘ └─────────────┘
▲ │
└──────────────────────────────────────┘
(циклическая зависимость!)
Сценарий 1 (заказ): Заказы → Партии → Склад ✅
Сценарий 2 (брак на складе): Склад → Партии → Заказы ✅
Результат: циклические зависимости, распределённый комок грязи!
Обработка ошибок в распределенных системах
Сценарий: сеть упала во время размещения заказа
Клиент → Заказы → [СБОЙ] → Партии ❓
Проблема: временная связанность (temporal coupling) — все службы должны работать одновременно.
Типы связанности:
Согласованность исполнения — компоненты должны знать порядок операций. Пример: RPC-вызовы.
Согласованность времени — компоненты должны работать одновременно. Пример: синхронные HTTP-вызовы.
Согласованность имени — компоненты согласуют только имя события. Пример: события ✅
Решение: думать глаголами, а не существительными!
Вместо:
- Служба заказов
- Служба партий
Используем:
- Служба размещения заказов
- Служба управления партиями
Новая архитектура:
┌────────────────────────────────────────────────┐
│ Redis Pub/Sub │
│ • change_batch_quantity (входящие) │
│ • line_allocated (исходящие) │
└────────────────────────────────────────────────┘
↓ ↑
Consumer Publisher
↓ ↑
┌────────────────────────────────────────────────┐
│ Служба размещения заказов │
│ • Принимает команды │
│ • Публикует события │
│ • Независима от других служб │
└────────────────────────────────────────────────┘
Преимущества:
- Службы независимы — можно принимать заказы, даже если Партии не работают
- Меньше связанности — легко изменить порядок операций
- Конечная согласованность — всё согласуется со временем
Использование канала «издатель/подписчик» Redis
Сквозной тест
def test_change_batch_quantity_leading_to_reallocation():
# 1. Создаём 2 партии и размещаем заказ в первой
orderid, sku = random_orderid(), random_sku()
earlier_batch = random_batchref('old')
later_batch = random_batchref('newer')
api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta='2011-01-02')
api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-02')
response = api_client.post_to_allocate(orderid, sku, 10)
assert response.json()['batchref'] == earlier_batch
# 2. Подписываемся на исходящие события
subscription = redis_client.subscribe_to('line_allocated')
# 3. Отправляем команду на уменьшение партии
redis_client.publish_message('change_batch_quantity', {
'batchref': earlier_batch,
'qty': 5 # ← Меньше, чем в заказе!
})
# 4. Ждём событие о переразмещении
for attempt in Retrying(stop=stop_after_delay(3)):
with attempt:
message = subscription.get_message(timeout=1)
if message:
data = json.loads(message['data'])
assert data['orderid'] == orderid
assert data['batchref'] == later_batch # ← Переразмещено!
Redis — ещё один тонкий адаптер
Потребитель событий (входящие):
# entrypoints/redis_eventconsumer.py
r = redis.Redis(**config.get_redis_host_and_port())
def main():
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe('change_batch_quantity')
for m in pubsub.listen():
handle_change_batch_quantity(m)
def handle_change_batch_quantity(m):
data = json.loads(m['data'])
cmd = commands.ChangeBatchQuantity(
ref=data['batchref'],
qty=data['qty']
)
messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())
Издатель событий (исходящие):
# adapters/redis_eventpublisher.py
r = redis.Redis(**config.get_redis_host_and_port())
def publish(channel, event: Event):
r.publish(channel, json.dumps(asdict(event)))
# handlers.py
def publish_allocated_event(event: Allocated, uow):
redis_eventpublisher.publish('line_allocated', event)
Новое событие:
@dataclass
class Allocated(Event):
orderid: str
sku: str
qty: int
batchref: str # ← Где размещено
Модель обновляется:
class Product:
def allocate(self, line: OrderLine) -> str:
batch = next(b for b in sorted(self.batches) if b.can_allocate(line))
batch.allocate(line)
# ← Публикуем событие!
self.events.append(Allocated(
orderid=line.orderid,
sku=line.sku,
qty=line.qty,
batchref=batch.reference
))
return batch.reference
Внутренние события против внешних
Важно: разделяйте внутренние и внешние события!
Внутренние — используются только внутри системы. Пример: OutOfStock.
Внешние — публикуются наружу для других систем. Пример: Allocated.
Почему это важно:
- Внутренние события можно менять без согласования с другими командами
- Внешние события — это публичный API, их изменение требует версии
Выводы
- Делите по глаголам, а не по существительным — службы по бизнес-процессам
- Асинхронные события уменьшают связанность между службами
- Redis Pub/Sub — простой брокер сообщений
- Входящие команды → обработка → Исходящие события
- Конечная согласованность — новая концепция для освоения
- Разделяйте внутренние и внешние события
Вопросы
- Почему деление микросервисов по существительным приводит к проблемам?
- Что такое временная связанность?
- Как Redis Pub/Sub помогает интегрировать службы?
- В чём разница между внутренними и внешними событиями?
- Что такое согласованность имени и почему она лучше согласованности времени?
