10. Команды и обработчик команд (стр. 200-210)
Проблема: события для всего?
В предыдущей главе мы использовали события для всего. Но BatchCreated — это не факт, партия ещё не создана! Мы только хотим её создать.
Решение: разделить команды и события.
Различия между командами и событиями
Команда (Command) — намерение что-то сделать. Посылается одному получателю.
Событие (Event) — факт, который уже произошёл. Рассылается всем слушателям.
Сравнение:
Форма имени:
- Команда: повелительное наклонение —
CreateBatch,Allocate - Событие: прошедшее время —
BatchCreated,Allocated
Обработка ошибок:
- Команда: выбрасываем ошибку пользователю
- Событие: игнорируем ошибку, логируем
Отправка:
- Команда: одному обработчику
- Событие: всем слушателям (многие)
Семантика:
- Команда: намерение на будущее
- Событие: факт о прошлом
Примеры:
- Команда:
Allocate(orderid, sku, qty)— «размести заказ!» - Событие:
OutOfStock(sku)— «товара нет в наличии (уже случилось)»
Создаём команды
# commands.py
class Command:
pass
@dataclass
class Allocate(Command):
orderid: str
sku: str
qty: int
@dataclass
class CreateBatch(Command):
ref: str
sku: str
qty: int
eta: Optional[date] = None
@dataclass
class ChangeBatchQuantity(Command):
ref: str
qty: int
Различия в обработке исключений
Шина сообщений различает команды и события:
# messagebus.py
Message = Union[Command, Event]
def handle(message: Message, uow):
queue = [message]
while queue:
message = queue.pop(0)
if isinstance(message, Event):
handle_event(message, queue, uow)
elif isinstance(message, Command):
handle_command(message, queue, uow)
Обработка событий
def handle_event(event, queue, uow):
for handler in EVENT_HANDLERS[type(event)]:
try:
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except Exception:
logger.exception('Exception handling event %s', event)
continue # ← Игнорируем ошибку, идём дальше!
Обработка команд
def handle_command(command, queue, uow):
try:
handler = COMMAND_HANDLERS[type(command)] # ← Один обработчик!
result = handler(command, uow=uow)
queue.extend(uow.collect_new_events())
return result
except Exception:
logger.exception('Exception handling command %s', command)
raise # ← Выбрасываем ошибку!
Словари обработчиков
EVENT_HANDLERS = {
OutOfStock: [send_out_of_stock_notification],
}
COMMAND_HANDLERS = {
Allocate: allocate,
CreateBatch: add_batch,
ChangeBatchQuantity: change_batch_quantity,
}
События, команды и обработка ошибок
Вопрос: что если событие не обработалось? Система останется в несогласованном состоянии?
Ответ: нет, потому что:
- Команда изменяет один агрегат атомарно (через UoW)
- События — это побочные эффекты (уведомления, интеграции)
- Успех команды не зависит от успеха обработчиков событий
Пример: VIP-клиенты
# Агрегат
class History:
def record_order(self, order_id: str, amount: int):
self.orders.add(HistoryEntry(order_id, amount))
if len(self.orders) == 3:
self.events.append(CustomerBecameVIP(self.customer_id))
# Обработчик 1: создаёт заказ (ОБЯЗАТЕЛЬНО)
def create_order_from_basket(uow, cmd: CreateOrder):
with uow:
order = Order.from_basket(cmd.customer_id, cmd.basket_items)
uow.orders.add(order)
uow.commit() # → инициирует OrderCreated
# Обработчик 2: обновляет историю (если получится)
def update_customer_history(uow, event: OrderCreated):
with uow:
history = uow.order_history.get(event.customer_id)
history.record_order(event.order_id, event.order_amount)
uow.commit() # → инициирует CustomerBecameVIP
# Обработчик 3: отправляет email (если получится)
def congratulate_vip_customer(uow, event: CustomerBecameVIP):
with uow:
customer = uow.customers.get(event.customer_id)
email.send(customer.email_address, 'Congratulations!')
Сценарии:
create_orderсломался — ❌ Заказ не создан, клиент не получил товарupdate_customer_historyсломался — ✅ Заказ создан, но клиент не стал VIPcongratulate_vip_customerсломался — ✅ Клиент стал VIP, но не получил email
Вывод: команда должна выполниться, события — по возможности.
Синхронное восстановление после ошибок
Проблема: события иногда не обрабатываются (сеть упала, БД заблокирована).
Решение 1: логирование
def handle_event(event, queue, uow):
for handler in EVENT_HANDLERS[type(event)]:
try:
logger.debug('Обработка события %s', event)
handler(event, uow=uow)
except Exception:
logger.exception('Исключение при обработке события %s', event)
continue
Лог:
Обработка события CustomerBecameVIP(customer_id=12345)
обработчиком <function congratulate_vip_customer at 0x10ebc9a60>
Исключение при обработке события CustomerBecameVIP(customer_id=12345)
Решение 2: повторные попытки
from tenacity import Retrying, stop_after_attempt, wait_exponential
def handle_event(event, queue, uow):
for handler in EVENT_HANDLERS[type(event)]:
try:
for attempt in Retrying(
stop=stop_after_attempt(3), # 3 попытки
wait=wait_exponential() # Экспоненциальная задержка
):
with attempt:
handler(event, uow=uow)
queue.extend(uow.collect_new_events())
except RetryError:
logger.error('Не удалось обработить событие %s', event)
continue
Выводы
- Команда = намерение (
CreateBatch), Событие = факт (BatchCreated) - Команды выбрасывают ошибки, события логируют и игнорируют
- Одна команда → один обработчик, одно событие → много обработчиков
- Успех команды не зависит от событий
- Повторные попытки для обработки событий
Вопросы
- В чём разница между командой и событием?
- Почему команды выбрасывают ошибки, а события — нет?
- Зачем нужны повторные попытки для событий?
- Что такое «граница согласованности» в контексте команд?
