基于 Elixir Port 构建与 Python Matplotlib 通信的异构数据可视化服务


我们的一个核心业务系统是用 Elixir 构建的,它处理着大量的实时遥测数据。最近出现一个需求:为运营团队提供按需生成的复杂时序数据分析图表。这些图表不是简单线图,需要包含统计学标记、多轴、复杂的注释,这些是 Matplotlib 的强项,但用前端 JS 库(如 D3.js 或 Chart.js)在浏览器中实时渲染不仅性能堪忧,还需要将大量原始数据点传输到客户端,这在网络和安全上都是不可接受的。

方案必须是服务器端渲染。但问题是,我们的技术栈是 Elixir/BEAM,一个不以科学计算和绘图见长的平台。而 Python 及其生态(NumPy, Matplotlib)是这个领域的绝对王者。

第一个进入脑海的方案是,用 Python Flask/FastAPI 搭建一个专门的绘图微服务,Elixir 通过 HTTP 调用它。这个方案可行,但引入了额外的部署、运维和网络开销。每一次绘图请求都意味着一次完整的 TCP 握手、HTTP 解析、序列化和反序列化。对于一个要求低延迟、高并发的内部服务,这个开销显得有些笨重。我们真正需要的,是一种更轻量、更受控的跨语言进程通信方式。

Elixir 的 Port 机制提供了一个完美的答案。它允许 BEAM 虚拟机与外部 OS 进程直接通过标准输入/输出进行交互,开销极低,并且能与 OTP 的监督树(Supervision Tree)无缝集成。这让我们能像管理一个普通 Elixir 进程一样,管理一个外部 Python 进程的生命周期。

最终的技术决策是:

  1. Elixir/Phoenix: 作为主服务,处理 API 请求。
  2. Elixir Port: 负责启动并管理一个长期运行的 Python 绘图脚本。
  3. Python/Matplotlib: 作为 Elixir 控制下的一个子进程,负责接收绘图指令和数据,生成图表图片。
  4. DynamoDB: 存储时序数据,由 Elixir 服务层查询。
  5. Vite/React: 前端,用于展示由后端生成的图表。

这种架构的整体数据流如下:

sequenceDiagram
    participant FE as Vite/React Frontend
    participant BE as Elixir/Phoenix Backend
    participant DB as DynamoDB
    participant PY as Python/Matplotlib Port

    FE->>+BE: GET /api/charts/device-123?range=24h
    BE->>+DB: Query(device-123, last 24h)
    DB-->>-BE: Time-series data
    BE->>+PY: Send plot command + data via Port
    PY-->>PY: Generate plot with Matplotlib
    PY-->>-BE: Return PNG binary data via Port
    BE-->>-FE: Response with Content-Type: image/png

第一步:构建健壮的 Python 绘图 Worker

这个 Python 脚本不是一次性的,它必须是一个长期运行的守护进程,通过 stdin 循环接收任务,并将结果输出到 stdout。一个常见的错误是让 Python 脚本处理完一个请求就退出,这会导致频繁的进程创建和销毁开销,完全违背了使用 Port 的初衷。

另一个关键点是通信协议。直接通过 stdin/stdout 交换原始 JSON 字符串是不可靠的,因为我们无法确定一个数据块的边界,特别是当数据中包含换行符时。更严重的是,我们需要传输二进制图片数据。

一个在生产环境中稳定可靠的方案是采用基于长度前缀的二进制协议。每次发送消息时,先发送一个固定长度(例如4个字节)的头部,表示后续消息体的长度,然后再发送消息体。

plot_worker.py:

import sys
import json
import struct
import traceback
import io
import logging

import matplotlib
matplotlib.use('Agg') # Crucial for non-GUI environments
import matplotlib.pyplot as plt
import numpy as np

# --- 日志配置,输出到 stderr 以免污染 stdout ---
logging.basicConfig(
    stream=sys.stderr, 
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# --- 核心通信协议函数 ---
def read_message():
    """从 stdin 读取一个消息"""
    try:
        # 读取4字节的长度前缀
        header = sys.stdin.buffer.read(4)
        if not header:
            # 如果Elixir Port关闭,这里会读到空,是进程正常退出的信号
            logging.info("Input stream closed. Exiting.")
            sys.exit(0)
        
        msg_len = struct.unpack('>I', header)[0]
        # 读取指定长度的消息体
        body = sys.stdin.buffer.read(msg_len)
        return json.loads(body.decode('utf-8'))
    except (struct.error, json.JSONDecodeError) as e:
        logging.error(f"Failed to read or decode message: {e}")
        return None

def send_message(data):
    """向 stdout 发送一个消息"""
    try:
        if isinstance(data, str):
            # 发送JSON格式的错误/状态消息
            encoded_body = json.dumps({"status": "error", "message": data}).encode('utf-8')
        else:
            # 发送二进制图片数据
            encoded_body = data

        header = struct.pack('>I', len(encoded_body))
        sys.stdout.buffer.write(header)
        sys.stdout.buffer.write(encoded_body)
        sys.stdout.buffer.flush()
    except Exception as e:
        logging.error(f"Failed to send message: {e}")


# --- 业务逻辑:绘图函数 ---
def generate_plot(data):
    """
    根据传入的数据生成 Matplotlib 图表。
    在真实项目中,这个函数会非常复杂。
    """
    if not data or 'timestamps' not in data or 'values' not in data:
        raise ValueError("Invalid data format for plotting.")

    timestamps = data.get('timestamps', [])
    values = data.get('values', [])
    title = data.get('title', 'Time Series Data')

    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(timestamps, values, marker='o', linestyle='-')
    ax.set_title(title)
    ax.set_xlabel("Timestamp")
    ax.set_ylabel("Value")
    ax.grid(True)
    plt.xticks(rotation=45)
    fig.tight_layout()

    # 将图表渲染到内存中的二进制缓冲区
    buf = io.BytesIO()
    fig.savefig(buf, format='png')
    buf.seek(0)
    plt.close(fig) # 必须关闭,否则会内存泄漏
    
    return buf.getvalue()


def main_loop():
    """主事件循环"""
    logging.info("Python plot worker started.")
    while True:
        try:
            command = read_message()
            if command is None:
                # 协议错误,可能需要重启
                logging.error("Received invalid command, continuing loop.")
                continue

            if command.get("type") == "plot":
                logging.info(f"Processing plot command for device: {command.get('data', {}).get('title')}")
                # 在真实项目中,这里会有更复杂的错误处理
                png_data = generate_plot(command.get("data"))
                send_message(png_data)
                logging.info("Plot generated and sent successfully.")
            else:
                # 不支持的命令
                error_msg = f"Unknown command type: {command.get('type')}"
                logging.warning(error_msg)
                send_message(error_msg)

        except Exception as e:
            # 捕获所有异常,将其作为错误消息发送回 Elixir,并继续循环
            # 这保证了worker的健壮性,不会因为一次绘图失败而崩溃
            tb_str = traceback.format_exc()
            error_msg = f"An unhandled exception occurred: {e}\n{tb_str}"
            logging.error(error_msg)
            send_message(error_msg)


if __name__ == "__main__":
    main_loop()

这个 Python 脚本的几个关键设计:

  1. matplotlib.use('Agg'): 必须在导入 pyplot 之前调用。这告诉 Matplotlib 使用一个非交互式的后端,它不依赖任何 GUI 工具包,这对于在服务器环境中运行至关重要。
  2. 日志到 stderr: 所有日志都输出到标准错误流。这样,stdout 就被专门用于与 Elixir 的数据通信,避免了日志污染协议。
  3. 健壮的循环: main_loop 包含一个宽泛的 try...except 块。任何绘图或逻辑错误都会被捕获、记录,并作为错误消息发送回 Elixir,但不会导致整个 Python 进程崩溃。
  4. 资源管理: plt.close(fig) 是一个很容易被忽略但至关重要的步骤。每次调用 plt.figureplt.subplots 都会创建新的图形对象,如果不显式关闭,它们会一直驻留在内存中,最终导致内存泄漏。

第二步:Elixir GenServer 作为 Port 管理器

现在,我们需要在 Elixir 侧创建一个 GenServer 来管理这个 Python 进程的生命周期。这个 GenServer 将负责启动 Port,向其发送命令,并处理来自它的响应(无论是图片数据还是错误信息)。

lib/my_app/plot_worker.ex:

defmodule MyApp.PlotWorker do
  @moduledoc """
  A GenServer that manages a Python Matplotlib process via an Elixir Port.
  """
  use GenServer

  require Logger

  # 配置Python脚本路径和启动参数
  @python_executable "python3" # Or path from config
  @python_script "priv/python/plot_worker.py"
  # 我们需要一个唯一的worker名字,以便在应用中调用它
  @worker_name __MODULE__

  # --- Public API ---

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: @worker_name)
  end

  @doc """
  Asynchronously requests a plot to be generated.
  The result will be sent to the `caller` process.
  """
  def generate_plot(caller, plot_data) do
    # 使用 GenServer.call/3 来实现同步调用和超时控制
    GenServer.call(@worker_name, {:plot, caller, plot_data}, :infinity)
  end

  # --- GenServer Callbacks ---

  @impl true
  def init(_state) do
    # Port 启动参数
    # 'spawn_executable' 是安全执行外部程序的推荐方式
    # 'binary' 模式处理原始字节流
    # '{:packet, 4}' 自动处理4字节长度前缀协议
    # 'stderr_to_stdout: false' 保持 stderr 独立,用于日志
    # 'exit_status: true' 在进程退出时接收通知
    port_opts = [
      :spawn_executable,
      :binary,
      {:packet, 4},
      {:exit_status, true},
      args: [@python_script]
    ]

    case Port.open({:spawn_executable, @python_executable}, port_opts) do
      port when is_port(port) ->
        Logger.info("Python plot worker port started: #{inspect(port)}")
        # 状态机::idle表示空闲, {:busy, caller}表示正在处理请求
        # state: %{port: Port.t(), status: :idle | {:busy, pid}}
        {:ok, %{port: port, status: :idle}}

      {:error, reason} ->
        Logger.error("Failed to start Python plot worker port: #{inspect(reason)}")
        {:stop, {:port_start_failed, reason}}
    end
  end

  @impl true
  def handle_call({:plot, caller, plot_data}, from, state = %{status: :idle, port: port}) do
    Logger.info("Received plot request from #{inspect(caller)}")
    
    # 编码请求
    # 在真实项目中,应该使用更健壮的库如 Jason
    command = %{type: "plot", data: plot_data}
    encoded_command = Jason.encode!(command)

    # 通过 Port 发送命令
    # Port.command/2 是非阻塞的
    if Port.command(port, encoded_command) do
      # 更新状态为 :busy,并保存请求者的pid
      # 'from' 是 GenServer.call 内部的唯一引用,不能直接用
      new_state = %{state | status: {:busy, caller}}
      # 使用 :noreply,因为响应将从 Port 异步到达
      {:noreply, new_state}
    else
      # Port 已关闭或发送失败
      {:reply, {:error, :port_command_failed}, state}
    end
  end

  # 如果 worker 正忙,拒绝新的请求
  # 这是简单的背压机制
  def handle_call({:plot, _caller, _plot_data}, _from, state = %{status: {:busy, _}}) do
    Logger.warning("Plot worker is busy, rejecting new request.")
    {:reply, {:error, :worker_busy}, state}
  end

  # --- 处理来自 Port 的消息 ---
  @impl true
  def handle_info({port, {:data, data}}, state = %{port: port, status: {:busy, caller}}) do
    # 这里的 data 可能是二进制图片,也可能是JSON错误信息
    case Jason.decode(data) do
      {:ok, %{"status" => "error", "message" => msg}} ->
        # Python端返回了业务错误
        Logger.error("Python worker reported an error: #{msg}")
        GenServer.reply(caller, {:error, {:python_error, msg}})
      
      _ ->
        # 假设是成功的二进制图片数据
        Logger.info("Received plot binary data from Python worker.")
        GenServer.reply(caller, {:ok, data})
    end

    # 任务完成,将状态重置为 :idle
    {:noreply, %{state | status: :idle}}
  end
  
  # 如果在空闲时收到数据,说明协议有问题,记录下来
  def handle_info({port, {:data, data}}, state = %{port: port, status: :idle}) do
    Logger.warning("Received unexpected data from port while idle: #{inspect(data)}")
    {:noreply, state}
  end

  # --- 处理 Port 的生命周期事件 ---
  @impl true
  def handle_info({port, {:exit_status, status}}, state = %{port: port}) do
    Logger.error("Python worker process exited with status: #{status}. Crashing GenServer to be restarted by supervisor.")
    # 让 GenServer 崩溃,OTP 监督树会根据策略重启它,从而重新启动 Python 进程
    {:stop, {:python_process_crashed, status}, state}
  end
  
  # --- 终止回调,确保 Port 被关闭 ---
  @impl true
  def terminate(reason, state) do
    Logger.warn("PlotWorker terminating with reason: #{inspect(reason)}")
    if is_port(state.port), do: Port.close(state.port)
    :ok
  end
end

PlotWorker 的设计亮点:

  1. {:packet, 4}: 这是 Port.open/2 的一个神奇选项。它告诉 BEAM 自动处理长度前缀协议。我们只需要发送原始数据,BEAM 会为我们加上4字节的长度前缀;在接收时,BEAM 会先读取4字节长度,然后读取相应长度的数据块,再作为一个完整的消息传递给 GenServer。这极大地简化了协议处理。
  2. 状态机 (:idle, :busy): GenServer 内部维护一个简单的状态机。这确保了在任何时刻,它只处理一个绘图请求。这是一个简单的背压(back-pressure)实现,防止请求淹没 Python 进程。在真实项目中,可以升级为一个工作池(Pool)来处理并发请求。
  3. 监督与自愈: 当 Python 进程因任何原因崩溃时,Port 会关闭,并向 GenServer 发送一个 {:exit_status, status} 消息。handle_info 回调收到这个消息后,会选择让 GenServer 自身也崩溃 ({:stop, ...})。这看起来很暴力,但却是 OTP 的哲学:让它挂掉(Let it crash)。监督树会根据重启策略(如 :one_for_one)自动重启 PlotWorker GenServer,而在 init/1 回调中,一个新的 Python 进程将随之启动。系统实现了自动恢复。
  4. 解耦调用者: generate_plot/2 API 的设计很有趣。它接收一个 caller pid,并将结果异步地发送给它。这种设计使得 PlotWorker 可以被任何进程调用(例如一个 Phoenix Controller),而不仅仅是发起 GenServer.call 的进程。

第三步:集成到 Phoenix 和数据层

现在,我们将 PlotWorker 集成到 Phoenix 控制器中,并从 DynamoDB 获取数据。首先,把 PlotWorker 加入到应用监督树。

lib/my_app/application.ex:

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # ... other children like Repo and Endpoint
      MyApp.PlotWorker,
    ]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

接下来是 Phoenix 控制器。它负责解析请求、从 DynamoDB 拉取数据、调用 PlotWorker,最后将返回的图片二进制流发送给客户端。

lib/my_app_web/controllers/chart_controller.ex:

defmodule MyAppWeb.ChartController do
  use MyAppWeb, :controller

  alias MyApp.PlotWorker
  alias MyApp.Data.TimeSeries # 假设这是访问 DynamoDB 的模块

  def show(conn, %{"device_id" => device_id}) do
    case TimeSeries.fetch_data_for_device(device_id) do
      {:ok, data_points} ->
        # 准备传递给 Python 的数据结构
        plot_data = %{
          "title" => "Telemetry for #{device_id}",
          "timestamps" => Enum.map(data_points, & &1.timestamp),
          "values" => Enum.map(data_points, & &1.value)
        }

        # 调用 worker,设置一个合理的超时
        case PlotWorker.generate_plot(self(), plot_data) do
          {:ok, png_binary} ->
            conn
            |> put_resp_header("content-type", "image/png")
            |> send_resp(200, png_binary)

          {:error, :worker_busy} ->
            conn
            |> put_resp_header("retry-after", "10")
            |> send_resp(503, "Plotting service is busy, please try again later.")
          
          {:error, {:python_error, reason}} ->
            Logger.error("Plot generation failed in Python: #{reason}")
            send_resp(conn, 500, "Failed to generate plot due to an internal error.")

          {:error, reason} ->
            Logger.error("Plot worker failed with reason: #{inspect(reason)}")
            send_resp(conn, 500, "Internal server error.")
        end

      {:error, :not_found} ->
        send_resp(conn, 404, "Device not found.")
    end
  end
end

# 模拟的数据模块
defmodule MyApp.Data.TimeSeries do
  # 在真实项目中,这里会使用 ExAws.Dynamo 或其他库
  def fetch_data_for_device(device_id) do
    Logger.info("Fetching data for #{device_id} from DynamoDB...")
    # 模拟返回数据
    # In a real scenario, this would be a DynamoDB query.
    timestamps = 1..20
    |> Enum.map(&(&1 * 1000 + 1672531200)) # Fake timestamps
    |> Enum.map(&DateTime.from_unix!(&1, :millisecond))
    |> Enum.map(&DateTime.to_iso8601(&1))
    
    values = for _ <- 1..20, do: :rand.uniform() * 100
    
    data_points = Enum.zip(timestamps, values)
    |> Enum.map(fn {ts, val} -> %{timestamp: ts, value: val} end)

    {:ok, data_points}
  end
end

控制器清晰地处理了各种成功和失败的情况,包括 worker 忙碌(返回 503 Service Unavailable),Python 内部错误,以及数据未找到等。

第四步:Vite 前端展示

前端部分相对简单。我们只需要一个 <img> 标签,其 src 属性指向我们的 Phoenix API 端点。

一个简单的 React 组件 ChartDisplay.jsx:

import React, { useState } from 'react';

function ChartDisplay({ deviceId }) {
  const [timestamp, setTimestamp] = useState(Date.now());

  // 在API URL后附加时间戳,以防止浏览器缓存
  const chartUrl = `/api/charts/${deviceId}?t=${timestamp}`;

  const refreshChart = () => {
    setTimestamp(Date.now());
  };

  return (
    <div>
      <h3>Chart for {deviceId}</h3>
      <img 
        src={chartUrl} 
        alt={`Chart for device ${deviceId}`} 
        style={{ maxWidth: '100%', border: '1px solid #ccc' }}
      />
      <br />
      <button onClick={refreshChart}>Refresh Chart</button>
    </div>
  );
}

export default ChartDisplay;

当前方案的局限性与未来迭代

这套基于 Port 的架构在我们的场景中表现优异。它轻量、高效,且充分利用了 OTP 的容错能力。然而,它并非没有局限性。

当前的 PlotWorker 是一个单例 GenServer,这意味着它一次只能处理一个绘图请求,后续的请求必须排队或被拒绝。对于更高的并发需求,一个直接的优化路径是使用像 poolboy 这样的库来创建一个 GenServer 池,每个 GenServer 管理一个独立的 Python 进程。请求可以被分发到池中的任何一个空闲 worker,从而实现并行处理。

其次,我们自定义的二进制协议虽然高效,但也相对简单。如果 Elixir 与 Python 之间的交互变得更加复杂,例如需要双向流式通信或更复杂的 RPC 调用,那么维护这个自定义协议的成本会增加。在这种情况下,可以考虑转向更标准化的方案,如 gRPC。但这同样会引入新的依赖和配置复杂性,是一种需要权衡的 trade-off。

最后,Python 环境的管理是一个运维挑战。在部署时,必须确保目标服务器上安装了正确的 Python 版本以及所有依赖库(numpy, matplotlib)。使用容器化(Docker)可以将 Elixir 应用和 Python 环境打包在一起,是解决这个问题的有效手段,能确保开发、测试和生产环境的一致性。


  目录