我们的新一代个性化内容生成服务在原型阶段表现惊人,但部署到准生产环境后,推理延迟的P99指标直接飙升到了无法接受的3秒。问题根源很快被定位:模型需要实时获取用户最新的行为特征作为Prompt的一部分,而我们现有的特征平台是基于Hadoop/Spark的批处理管道,T+1的数据时效性对于生成式AI的实时交互场景是致命的。更糟糕的是,整个部署流程依赖于几份散落的README.md
和一堆手动执行的shell脚本,每次环境迁移都伴随着配置错误和遗漏,运维成本极高。
我们的目标非常明确:
- 构建一个低延迟的实时特征存储,满足模型在10毫秒内获取数据的要求。
- 实现整个AI服务(包括数据后端和模型推理API)的完全自动化、幂等化部署。
技术选型决策
在压力之下,我们快速评估了几个方案。对于实时特征存储,Redis曾是候选之一,但考虑到特征数据可能快速膨胀,纯内存方案的成本和持久化复杂度让我们犹豫。最终我们选择了ScyllaDB,它兼容Cassandra API,但其基于C++和Seastar框架的重写带来了极低的延迟和高吞吐量,非常适合我们需要频繁读写的特征存储场景。它的持久化能力和可扩展性也优于Redis。
对于自动化部署,我们放弃了继续维护shell脚本的念头。Terraform擅长基础设施的“创建”和“销毁”,但对于应用配置管理和部署的细节控制稍显不足。我们团队对Python比较熟悉,但又不想陷入从零构建部署框架的泥潭。Ansible的Agentless架构和基于YAML的声明式语法成了最佳选择。它的幂等性保证了重复执行 playbook 不会产生副作用,丰富的模块生态也足以覆盖从系统配置、软件包安装到服务管理的全流程。
模型侧,我们继续沿用Keras构建的文本生成模型,并使用一个轻量级的Flask应用将其封装为HTTP API。整个架构的蓝图就此敲定。
graph TD subgraph "Ansible Control Node" A[ansible-playbook site.yml] end subgraph "Managed Nodes" B(ScyllaDB Node) C(Keras App Node) end A -- "Role: scylladb" --> B A -- "Role: keras_app" --> C subgraph "Inference Flow" D[Client Request] --> E{Keras Inference API} E -- "1. Fetch features (user_id)" --> F[(ScyllaDB Feature Store)] F -- "2. Return features" --> E E -- "3. Generate & Return Content" --> D end C --> F
ScyllaDB特征存储的表结构设计
我们的特征非常简单,主要是用户的近期偏好和历史行为摘要。为了实现快速读取,user_id
自然成为了分区键。
-- cqlsh -u cassandra -p cassandra <scylla_host> -f setup_keyspace.cql
-- 创建一个专门用于特征存储的keyspace
-- SimpleStrategy在单数据中心环境下足够,副本因子为3保证了基本的高可用
CREATE KEYSPACE IF NOT EXISTS feature_store
WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1 -- 在我们的测试环境中设置为1,生产环境应至少为3
};
USE feature_store;
-- 用户实时特征表
-- user_id作为分区键,保证同一个用户的数据落在同一个节点,查询迅速
-- feature_name作为聚类键,允许我们查询一个用户下的特定特征或所有特征
CREATE TABLE IF NOT EXISTS user_realtime_features (
user_id text,
feature_name text,
feature_value text,
updated_at timestamp,
PRIMARY KEY (user_id, feature_name)
) WITH CLUSTERING ORDER BY (feature_name ASC);
-- 插入一些样本数据用于测试
INSERT INTO user_realtime_features (user_id, feature_name, feature_value, updated_at) VALUES ('user-123', 'last_viewed_category', 'tech', toTimestamp(now()));
INSERT INTO user_realtime_features (user_id, feature_name, feature_value, updated_at) VALUES ('user-123', 'preferred_author', 'Dr. Hinton', toTimestamp(now()));
INSERT INTO user_realtime_features (user_id, feature_name, feature_value, updated_at) VALUES ('user-456', 'last_viewed_category', 'finance', toTimestamp(now()));
这个CQL脚本是后续Ansible自动化的一部分,它定义了我们的数据模型。
Ansible项目结构
一个可维护的Ansible项目必须有清晰的结构。我们遵循roles
的最佳实践,将ScyllaDB和Keras应用的部署逻辑完全解耦。
.
├── inventory/
│ └── production.ini # 主机清单
├── group_vars/
│ └── all.yml # 全局变量
├── roles/
│ ├── scylladb/
│ │ ├── tasks/
│ │ │ └── main.yml
│ │ ├── handlers/
│ │ │ └── main.yml
│ │ ├── templates/
│ │ │ └── scylla.yaml.j2
│ │ └── files/
│ │ └── setup_keyspace.cql
│ └── keras_app/
│ ├── tasks/
│ │ └── main.yml
│ ├── handlers/
│ │ └── main.yml
│ ├── templates/
│ │ ├── keras_app.service.j2
│ │ └── config.py.j2
│ └── files/
│ ├── requirements.txt
│ └── app/ # Flask app source code
│ ├── app.py
│ └── model/ # Pre-trained Keras model
└── site.yml # 主Playbook
inventory/production.ini
:
[scylla_nodes]
db01.example.com ansible_host=192.168.1.10
[keras_app_servers]
app01.example.com ansible_host=192.168.1.20
group_vars/all.yml
:
# 全局变量,比如通用用户名
ansible_user: admin
scylla_version: "5.2"
# ScyllaDB集群的种子节点,对于单节点集群就是它自己
scylla_seeds: "192.168.1.10"
Role: scylladb - 自动化部署ScyllaDB
这个角色的核心任务是安装、配置并启动ScyllaDB服务,同时初始化数据库模式。
roles/scylladb/tasks/main.yml
:
- name: Add ScyllaDB repository key
ansible.builtin.get_url:
url: "https://keys.scylladb.com/enterprise/GPG-KEY-Scylla-Enterprise"
dest: /etc/apt/trusted.gpg.d/scylla-enterprise.gpg
mode: '0644'
- name: Add ScyllaDB repository
ansible.builtin.apt_repository:
repo: "deb [arch=amd64 signed-by=/etc/apt/trusted.gpg.d/scylla-enterprise.gpg] https://enterprise-deb.scylladb.com/ubuntu-jammy/v1 {{ scylla_version }} stable"
state: present
filename: scylla-enterprise
- name: Install ScyllaDB and Python driver
ansible.builtin.apt:
name:
- "scylla-enterprise-{{ scylla_version }}"
- python3-scylla-driver # for cqlsh script execution
state: present
update_cache: yes
notify: Restart scylla-server
- name: Configure ScyllaDB
ansible.builtin.template:
src: scylla.yaml.j2
dest: /etc/scylla/scylla.yaml
owner: scylla
group: scylla
mode: '0644'
notify: Restart scylla-server
- name: Ensure scylla-server is started and enabled
ansible.builtin.service:
name: scylla-server
state: started
enabled: yes
- name: Wait for CQL port to be available
ansible.builtin.wait_for:
host: "{{ ansible_host }}"
port: 9042
delay: 10
timeout: 300
state: started
# a flush_handlers is needed here to ensure scylla is running before we connect
- name: Flush handlers to apply restart if necessary
ansible.builtin.meta: flush_handlers
- name: Initialize Keyspace and Tables
ansible.builtin.command: "cqlsh {{ ansible_host }} -f /tmp/setup_keyspace.cql"
args:
creates: "/var/lib/scylla/data/feature_store" # A simple way to check for idempotency
when: inventory_hostname == scylla_seeds.split(',')[0] # Only run on the first seed node
become: no
# We copy the script first to execute it on the remote node
- name: Copy CQL script to remote
ansible.builtin.copy:
src: setup_keyspace.cql
dest: /tmp/setup_keyspace.cql
mode: '0644'
roles/scylladb/templates/scylla.yaml.j2
:
# This is a minimal configuration for a single node cluster.
cluster_name: 'FeatureStoreCluster'
seeds: "{{ scylla_seeds }}"
listen_address: {{ ansible_host }}
rpc_address: {{ ansible_host }}
endpoint_snitch: GossipingPropertyFileSnitch
# ... other ScyllaDB parameters ...
roles/scylladb/handlers/main.yml
:
- name: Restart scylla-server
ansible.builtin.service:
name: scylla-server
state: restarted
这里的notify
和handler
机制确保了只有在配置文件发生变化时,ScyllaDB服务才会被重启,这是幂等性的关键实践。
Keras推理服务与ScyllaDB集成
模型的推理代码需要连接ScyllaDB。下面是Flask应用的核心逻辑。
roles/keras_app/files/app/app.py
:
import os
from flask import Flask, request, jsonify
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from cassandra.policies import DCAwareRoundRobinPolicy
import tensorflow as tf
import logging
# Basic logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- App Initialization ---
app = Flask(__name__)
# --- ScyllaDB Connection ---
# Configuration is loaded from environment variables, which Ansible will set.
SCYLLA_HOSTS = os.getenv('SCYLLA_HOSTS', '127.0.0.1').split(',')
SCYLLA_KEYSPACE = 'feature_store'
try:
# In production, use a load balancing policy.
policy = DCAwareRoundRobinPolicy(local_dc=os.getenv('SCYLLA_DC', 'DC1'))
cluster = Cluster(SCYLLA_HOSTS, load_balancing_policy=policy)
session = cluster.connect(SCYLLA_KEYSPACE)
logging.info(f"Successfully connected to ScyllaDB at {SCYLLA_HOSTS}")
# Prepare statement for performance
GET_FEATURES_STMT = session.prepare("SELECT feature_name, feature_value FROM user_realtime_features WHERE user_id = ?")
except Exception as e:
logging.error(f"Failed to connect to ScyllaDB: {e}")
# In a real app, this might trigger a health check failure.
session = None
# --- Keras Model Loading ---
try:
# This is a placeholder for a real generative model.
# In a real project, this would be a more complex model loaded from a file.
model = tf.keras.models.load_model('./model/my_text_generator')
logging.info("Keras model loaded successfully.")
except Exception as e:
logging.error(f"Failed to load Keras model: {e}")
model = None
# --- API Endpoint ---
@app.route('/generate', methods=['POST'])
def generate_content():
if not model or not session:
return jsonify({"error": "Service is not ready"}), 503
data = request.get_json()
user_id = data.get('user_id')
base_prompt = data.get('prompt', '')
if not user_id:
return jsonify({"error": "user_id is required"}), 400
# 1. Fetch real-time features from ScyllaDB
try:
rows = session.execute(GET_FEATURES_STMT, [user_id])
user_features = {row.feature_name: row.feature_value for row in rows}
except Exception as e:
logging.error(f"Error fetching features for user {user_id}: {e}")
# Fallback: proceed without personalization
user_features = {}
# 2. Construct a personalized prompt
feature_prompt = ". ".join([f"{key} is {value}" for key, value in user_features.items()])
personalized_prompt = f"User profile: {feature_prompt}. Task: {base_prompt}"
logging.info(f"Generated prompt for user {user_id}: {personalized_prompt}")
# 3. Use Keras model to generate text
# This is a simplified generation logic.
# A real implementation would involve tokenization and a generation loop.
try:
# Dummy generation for demonstration
generated_text = f"Response for {user_id} based on prompt: '{personalized_prompt}'"
# real_output = model.predict(...)
except Exception as e:
logging.error(f"Model inference failed for user {user_id}: {e}")
return jsonify({"error": "Model inference failed"}), 500
return jsonify({"user_id": user_id, "generated_text": generated_text})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Role: keras_app - 自动化部署推理服务
这个角色负责部署Python应用,并将其作为systemd
服务来管理,确保其稳定运行和开机自启。
roles/keras_app/tasks/main.yml
:
- name: Install system dependencies
ansible.builtin.apt:
name: ['python3-pip', 'python3-venv']
state: present
- name: Create application user
ansible.builtin.user:
name: kerasapp
shell: /bin/bash
create_home: yes
- name: Deploy application code
ansible.builtin.copy:
src: app/
dest: /home/kerasapp/app
owner: kerasapp
group: kerasapp
mode: '0755'
- name: Deploy requirements file
ansible.builtin.copy:
src: requirements.txt
dest: /home/kerasapp/requirements.txt
owner: kerasapp
group: kerasapp
- name: Create Python virtual environment and install dependencies
ansible.builtin.pip:
requirements: /home/kerasapp/requirements.txt
virtualenv: /home/kerasapp/venv
virtualenv_command: python3 -m venv
become: yes
become_user: kerasapp
- name: Create application config from template
ansible.builtin.template:
src: config.py.j2
dest: /home/kerasapp/app/config.py
owner: kerasapp
group: kerasapp
mode: '0644'
- name: Setup systemd service
ansible.builtin.template:
src: keras_app.service.j2
dest: /etc/systemd/system/keras_app.service
notify:
- Reload systemd
- Restart keras_app
- name: Ensure keras_app service is started and enabled
ansible.builtin.service:
name: keras_app
state: started
enabled: yes
roles/keras_app/templates/keras_app.service.j2
:
[Unit]
Description=Keras Generative AI Inference Service
After=network.target
[Service]
User=kerasapp
Group=kerasapp
WorkingDirectory=/home/kerasapp/app
# Pass ScyllaDB host info via environment variables
Environment="SCYLLA_HOSTS={{ scylla_seeds }}"
ExecStart=/home/kerasapp/venv/bin/gunicorn --workers 3 --bind 0.0.0.0:5000 app:app
Restart=always
[Install]
WantedBy=multi-user.target
roles/keras_app/handlers/main.yml
:
- name: Reload systemd
ansible.builtin.systemd:
daemon_reload: yes
- name: Restart keras_app
ansible.builtin.service:
name: keras_app
state: restarted
通过systemd
管理,我们的Python应用获得了生产级的守护、日志和重启能力。
最终的编排Playbook
site.yml
将所有角色串联起来,形成一个完整的部署流程。
---
- name: Deploy ScyllaDB Feature Store Cluster
hosts: scylla_nodes
become: yes
roles:
- scylladb
- name: Deploy Keras Inference Service
hosts: keras_app_servers
become: yes
roles:
- keras_app
执行ansible-playbook -i inventory/production.ini site.yml
,整个系统从裸机到提供服务便可一键完成。
局限性与未来迭代路径
这个方案成功解决了我们最初的痛点,实现了低延迟的特征获取和自动化部署。然而,在真实的大规模生产环境中,它还有几个需要演进的地方。
首先,当前的ScyllaDB部署是一个单节点,高可用性不足。下一步需要将Ansible playbook扩展为支持多节点集群部署,这涉及到动态生成seeds
列表、处理节点间的发现机制,以及更复杂的网络配置。
其次,特征的写入路径(ETL)并未在此次讨论中。一个完整的实时特征平台还需要一个数据管道,通过CDC(如Debezium)或流处理(如Flink/Kafka Streams)将上游业务数据库的变更实时同步到ScyllaDB中。
最后,随着服务规模的扩大,将Keras应用容器化并迁移到Kubernetes上进行管理是必然趋势。Ansible可以在这个演进过程中继续扮演角色,例如用于准备K8s工作节点的基础环境,或者通过Ansible Operator来管理在K8s中运行的应用。当前的systemd
方案,虽然对于虚拟机部署来说稳定可靠,但在云原生生态下缺乏弹性和编排能力。