Библиотеки Python Часть 2. Практическое применение (страница 3)

Страница 3

print(f"Текущая сумма транзакций по пользователям: {dict(user_totals)}")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает данные из топика `transactions`.

– Для каждого пользователя обновляется сумма его транзакций в словаре `user_totals`.

– Программа выводит текущие суммы по всем пользователям.

Задача 4: Сохранение обработанных данных в файл

Описание:

Топик `sensor_data` содержит данные с датчиков IoT:

– `sensor_id` – идентификатор датчика.

– `temperature` – измеренная температура.

– `timestamp` – время измерения.

Ваша задача: написать программу, которая сохраняет все данные о температуре выше 30°C в файл `high_temp.json`.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'sensor-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['sensor_data'])

# Открываем файл для записи

with open('high_temp.json', 'w') as outfile:

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

sensor_data = json.loads(msg.value().decode('utf-8'))

# Сохраняем данные, если температура выше 30°C

if sensor_data['temperature'] > 30:

json.dump(sensor_data, outfile)

outfile.write('\n') # Новый ряд для каждого объекта

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает данные из топика `sensor_data`.

– Данные с температурой выше 30°C записываются в файл `high_temp.json`.

Задача 5: Обнаружение аномалий в данных

Описание:

В топик `temperature_readings` поступают данные о температуре из различных городов:

– `city` – название города.

– `temperature` – измеренная температура.

– `timestamp` – время измерения.

Ваша задача: написать программу, которая будет находить и выводить аномалии – случаи, когда температура превышает 40°C или опускается ниже -10°C.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'temperature-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['temperature_readings'])

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

reading = json.loads(msg.value().decode('utf-8'))

# Проверяем на аномалии

if reading['temperature'] > 40 or reading['temperature'] < -10:

print(f"Аномалия! Город: {reading['city']}, Температура: {reading['temperature']}°C")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Консьюмер читает данные о температуре из топика.

– Если температура выходит за пределы нормального диапазона, программа выводит сообщение об аномалии.

Задача 6: Потоковое объединение данных

Описание:

Есть два топика:

1. `orders` – содержит данные о заказах: `order_id`, `product_id`, `quantity`.

2. `products` – содержит данные о товарах: `product_id`, `product_name`, `price`.

Ваша задача: написать программу, которая объединяет данные из этих двух топиков и выводит итоговую информацию о каждом заказе, включая название продукта и общую стоимость.

Решение:

```python

from confluent_kafka import Consumer

import json

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмеров для обоих топиков

order_consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'order-group',

'auto.offset.reset': 'earliest'

})

product_consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'product-group',

'auto.offset.reset': 'earliest'

})

order_consumer.subscribe(['orders'])

product_consumer.subscribe(['products'])

# Словарь для хранения данных о товарах

product_catalog = {}

try:

while True:

# Чтение данных из топика products

product_msg = product_consumer.poll(0.1)

if product_msg and not product_msg.error():

product = json.loads(product_msg.value().decode('utf-8'))

product_catalog[product['product_id']] = {

'name': product['product_name'],

'price': product['price']

}

# Чтение данных из топика orders

order_msg = order_consumer.poll(0.1)

if order_msg and not order_msg.error():

order = json.loads(order_msg.value().decode('utf-8'))

product_id = order['product_id']

# Объединение данных о заказе и товаре

if product_id in product_catalog:

product = product_catalog[product_id]

total_price = order['quantity'] * product['price']

print(f"Заказ {order['order_id']}: {product['name']} x {order['quantity']} = {total_price} $")

else:

print(f"Информация о товаре {product_id} отсутствует.")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

order_consumer.close()

product_consumer.close()

```

Объяснение:

– Данные из топика `products` кэшируются в словаре `product_catalog`.

– При чтении заказа из топика `orders` программа объединяет данные и вычисляет итоговую стоимость.

Задача 7: Потоковая обработка с вычислением скользящего среднего

Описание:

В топик `stock_prices` поступают данные о ценах акций:

– `symbol` – тикер акции.

– `price` – текущая цена.

– `timestamp` – время.

Ваша задача: вычислять скользящее среднее цены акции за последние 5 сообщений для каждого тикера.

Решение:

```python

from confluent_kafka import Consumer

import json

from collections import defaultdict, deque

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'stocks-group',

'auto.offset.reset': 'earliest'

})

consumer.subscribe(['stock_prices'])

# Дек для хранения последних цен по тикерам

price_window = defaultdict(lambda: deque(maxlen=5))

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

stock_data = json.loads(msg.value().decode('utf-8'))

# Добавляем цену в окно

symbol = stock_data['symbol']

price_window[symbol].append(stock_data['price'])

# Вычисляем скользящее среднее

moving_average = sum(price_window[symbol]) / len(price_window[symbol])

print(f"Скользящее среднее для {symbol}: {moving_average:.2f}")

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Используется `deque` для хранения последних 5 цен.

– Скользящее среднее вычисляется как сумма значений, делённая на их количество.

Задача 8: Генерация уведомлений

Описание:

В топик `user_actions` поступают данные о действиях пользователей:

– `user_id` – идентификатор пользователя.

– `action` – выполненное действие (например, "login", "purchase").

Напишите программу, которая отслеживает пользователей, выполнивших вход (`login`), но не совершивших покупку (`purchase`) в течение 10 минут, и отправляет уведомление в топик `notifications`.

Решение:

```python

from confluent_kafka import Consumer, Producer

import json

from datetime import datetime, timedelta

# Настройки Kafka

broker = 'localhost:9092'

# Создание консьюмера

consumer = Consumer({

'bootstrap.servers': broker,

'group.id': 'user-actions-group',

'auto.offset.reset': 'earliest'

})

producer = Producer({'bootstrap.servers': broker})

consumer.subscribe(['user_actions'])

# Словарь для отслеживания пользователей

user_login_time = {}

try:

while True:

msg = consumer.poll(1.0)

if msg is None:

continue

if msg.error():

continue

# Преобразуем сообщение в Python-объект

action = json.loads(msg.value().decode('utf-8'))

user_id = action['user_id']

action_type = action['action']

timestamp = datetime.fromisoformat(action['timestamp'])

if action_type == 'login':

user_login_time[user_id] = timestamp

elif action_type == 'purchase' and user_id in user_login_time:

del user_login_time[user_id]

# Проверяем, прошло ли 10 минут

current_time = datetime.now()

for user, login_time in list(user_login_time.items()):

if current_time – login_time > timedelta(minutes=10):

notification = {'user_id': user, 'message': 'Сделайте покупку!'}

producer.produce('notifications', value=json.dumps(notification))

print(f"Уведомление отправлено для пользователя {user}")

del user_login_time[user]

except KeyboardInterrupt:

print("Завершение работы.")

finally:

consumer.close()

```

Объяснение:

– Время входа пользователей сохраняется в словаре.

– Если с момента входа прошло более 10 минут и покупка не совершена, генерируется уведомление.

Эти задачи показывают, как использовать Apache Kafka для решения реальных задач, таких как фильтрация событий, подсчет статистики, агрегация данных и сохранение обработанной информации. Эти примеры помогут вам освоить основные подходы к работе с потоками данных в реальном времени.

1.3 Работа с базами данных: SQLAlchemy и интеграция с Pandas

SQLAlchemy – это мощная библиотека для работы с базами данных в Python. Она предоставляет инструменты для удобного взаимодействия с реляционными базами данных через ORM (Object Relational Mapping) или с использованием чистого SQL.

Pandas же идеально подходит для анализа данных, но иногда данные, которые мы хотим обработать, хранятся в базах данных. Для этого SQLAlchemy и Pandas можно эффективно интегрировать, чтобы выгружать данные из базы, обрабатывать их в Pandas и сохранять обратно.

Установка и подключение

Для начала работы установите библиотеку SQLAlchemy:

```bash

pip install sqlalchemy

```

Если вы используете SQLite, дополнительных действий не требуется. Для других баз данных, таких как PostgreSQL или MySQL, также потребуется установить драйверы, например:

```bash

pip install psycopg2 # Для PostgreSQL

pip install pymysql # Для MySQL

```

Создайте подключение к базе данных с помощью SQLAlchemy. Например, для SQLite это будет выглядеть так:

```python

from sqlalchemy import create_engine

# Создаем подключение к базе данных SQLite

engine = create_engine('sqlite:///example.db', echo=True)

```

Здесь `echo=True` означает, что в консоль будут выводиться SQL-запросы, выполняемые через SQLAlchemy, что полезно для отладки.

Создание таблиц и работа с ORM

SQLAlchemy поддерживает два основных подхода: работа через ORM и использование SQL-запросов напрямую. Рассмотрим оба.