构建异构客户端实时状态同步层 Elixir Phoenix Jetpack Compose 与 PWA 的整合实践


我们面临一个棘手的工程问题:一个核心业务需要同时在高性能的原生Android应用和轻量级的Web端(PWA)上提供服务。关键挑战在于,当用户在一个客户端上执行操作时,状态必须实时、可靠地同步到所有其他客户端,包括用户自己打开的多个设备。例如,在一个共享仪表盘上调整一个参数,所有观察者,无论是在原生App还是PWA上,都应立即看到变化。

传统的HTTP轮询方案在这种场景下是完全不可接受的。它会带来巨大的服务器压力和不可避免的延迟。各自为政的API和数据库后端则会导致数据一致性噩梦和双倍的维护成本。我们需要一个统一的、高效的实时通信层,它必须能同时服务于JVM生态的Kotlin和浏览器生态的JavaScript。

初步构想是建立一个基于WebSocket的发布/订阅(Pub/Sub)服务。经过技术选型,Elixir及其Phoenix框架成为最终选择。这并非偶然,BEAM虚拟机天生的并发能力、轻量级进程模型以及对长连接的卓越支持,使其成为构建此类系统的理想平台。Phoenix Channels更是对WebSocket进行了优雅封装,提供了开箱即用的主题订阅、广播和状态管理能力。

本文将记录从后端服务构建、原生客户端集成、PWA客户端集成,到最终通过CI/CD流水线自动化构建和验证整个异构系统的全过程。这不是一篇入门教程,而是对一个生产级实时同步层实现细节的复盘。

第一步:Elixir与Phoenix后端,状态管理的基石

核心思路是为每一个需要同步的“会话”或“资源”(例如一个共享仪表盘dashboard:123)在后端启动一个独立的进程来管理其状态。Elixir的GenServer是实现这一点的完美工具。它是一个受监督的、可串行处理消息的状态机。

我们将创建一个SessionServer来管理特定会话的状态。Phoenix Channel则作为客户端与SessionServer交互的网关。

graph TD
    subgraph "Android App (Jetpack Compose)"
        A[OkHttp WebSocket] --> B{Phoenix Channel Client}
    end
    subgraph "Web App (PWA)"
        C[phoenix.js WebSocket] --> D{Phoenix Channel Client}
    end
    subgraph "Elixir/Phoenix Backend"
        E[Phoenix Channel Endpoint]
        F(SessionChannel)
        G((GenServer per Session))
        H(PubSub)
    end
    B --> E
    D --> E
    E --> F
    F -- interacts with --> G
    G -- broadcasts via --> H
    H -- pushes to --> F
    F -- pushes to --> B
    F -- pushes to --> D

1. 状态管理核心:SessionServer

这个GenServer负责持有并更新某个特定会话的状态。当状态改变时,它通过Phoenix的PubSub机制广播更新。

lib/live_sync/session_server.go

defmodule LiveSync.SessionServer do
  use GenServer
  require Logger

  # Public API
  def start_link(session_id) do
    GenServer.start_link(__MODULE__, %{session_id: session_id, state: initial_state()}, name: via_tuple(session_id))
  end

  def get_state(session_id) do
    GenServer.call(via_tuple(session_id), :get_state)
  end

  def update_state(session_id, new_partial_state) do
    GenServer.cast(via_tuple(session_id), {:update_state, new_partial_state})
  end

  # GenServer Callbacks
  @impl true
  def init(args) do
    Logger.info("SessionServer for #{args.session_id} started.")
    {:ok, args}
  end

  @impl true
  def handle_call(:get_state, _from, state) do
    {:reply, state.state, state}
  end

  @impl true
  def handle_cast({:update_state, partial_state}, state) do
    # 在真实项目中,这里会有复杂的合并逻辑和验证
    # For a production scenario, complex merge logic and validation would live here.
    new_state_map = Map.merge(state.state, partial_state)
    
    # 广播状态变更
    # Broadcast the state change through Phoenix PubSub
    topic = "session:#{state.session_id}"
    LiveSyncWeb.Endpoint.broadcast!(topic, "state_updated", %{state: new_state_map})
    
    Logger.debug("State updated for #{state.session_id}: #{inspect(new_state_map)}")
    {:noreply, %{state | state: new_state_map}}
  end
  
  @impl true
  def terminate(reason, state) do
    Logger.warn("SessionServer for #{state.session_id} terminating. Reason: #{inspect(reason)}")
    :ok
  end

  # Private helpers
  defp via_tuple(session_id), do: {:via, Registry, {LiveSync.SessionRegistry, session_id}}
  
  defp initial_state, do: %{
    "title" => "Untitled Dashboard",
    "value" => 0,
    "last_updated_by" => nil
  }
end

我们使用Registry来动态地管理这些GenServer进程,允许我们通过session_id来查找它们。

2. 通信网关:SessionChannel

这个Channel处理客户端的加入、离开以及事件推送。

lib/live_sync_web/channels/session_channel.ex

defmodule LiveSyncWeb.SessionChannel do
  use Phoenix.Channel
  require Logger
  alias LiveSync.SessionServer

  def join("session:" <> session_id, _payload, socket) do
    # 在真实项目中,这里必须有认证和授权逻辑
    # In a real project, authentication and authorization logic are mandatory here.
    
    # 确保对应的GenServer正在运行
    # Ensure the corresponding GenServer is running
    case Process.whereis(via_tuple(session_id)) do
      nil ->
        {:ok, _pid} = LiveSync.SessionServer.start_link(session_id)
      _pid ->
        :ok
    end

    # 订阅此会话的PubSub主题
    # Subscribe to the PubSub topic for this session
    LiveSyncWeb.Endpoint.subscribe("session:#{session_id}")

    # 获取当前状态并发送给新加入的客户端
    # Get the current state and send it to the newly joined client
    current_state = SessionServer.get_state(session_id)
    
    Logger.info("Client #{socket.assigns.user_id} joined session:#{session_id}")
    {:ok, %{state: current_state}, assign(socket, :session_id, session_id)}
  end
  
  # 客户端推送更新事件
  # Client pushes an update event
  @impl true
  def handle_in("update_state", payload, socket) do
    session_id = socket.assigns.session_id
    
    # 在此添加 payload 的验证逻辑
    # Add payload validation logic here.
    
    # 将更新请求转发给GenServer
    # Forward the update request to the GenServer
    SessionServer.update_state(session_id, payload)
    
    {:noreply, socket}
  end
  
  # 服务器广播的消息不会经过 handle_out,直接发给客户端
  # Messages broadcast from the server don't go through handle_out, they are sent directly.

  @impl true
  def terminate(reason, socket) do
    Logger.warn("Channel for session #{socket.assigns.session_id} terminated. Reason: #{inspect(reason)}")
    :ok
  end
  
  defp via_tuple(session_id), do: {:via, Registry, {LiveSync.SessionRegistry, session_id}}
end

至此,后端的核心逻辑已经完成。它能够为每个会话动态创建状态进程,并通过Channel安全地暴露接口。

第二步:Jetpack Compose原生客户端集成

Android端集成的难点在于找到一个稳定、支持协程的Phoenix Channel客户端。我们选择了一个社区维护的库,并对其进行封装以适应我们的MVVM架构。

1. 依赖与WebSocket客户端设置

我们需要一个WebSocket客户端(如OkHttp)和一个Phoenix Channel协议的实现库。这里我们使用phoenix-channels-client-kotlin

build.gradle.kts

dependencies {
    implementation("com.squareup.okhttp3:okhttp3:4.11.0")
    implementation("com.github.dsrees:phoenix-channels-client-kotlin:2.0.0")
    // Coroutines & ViewModel
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
    implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:2.6.2")
}

2. 封装的SessionRepository

为了将通信逻辑与UI解耦,我们创建一个Repository来管理Channel的连接和状态流。

SessionRepository.kt

import android.util.Log
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.github.dsrees.phoenix.Channel
import com.github.dsrees.phoenix.Socket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import java.util.concurrent.TimeUnit

data class SessionState(
    val title: String = "Loading...",
    val value: Int = 0,
    val lastUpdatedBy: String? = null
)

class SessionRepository(private val scope: CoroutineScope) {
    
    private val _sessionState = MutableStateFlow(SessionState())
    val sessionState = _sessionState.asStateFlow()

    private var socket: Socket? = null
    private var channel: Channel? = null
    
    private val mapper = jacksonObjectMapper()

    companion object {
        private const val SOCKET_URL = "ws://10.0.2.2:4000/socket/websocket" // 10.0.2.2 is the host machine for Android Emulator
        private const val TAG = "SessionRepository"
    }

    fun connect(sessionId: String) {
        if (socket?.isConnected == true) {
            Log.w(TAG, "Already connected.")
            return
        }

        val okHttpClient = OkHttpClient.Builder()
            .connectTimeout(10, TimeUnit.SECONDS)
            .readTimeout(10, TimeUnit.SECONDS)
            .build()
        
        socket = Socket(SOCKET_URL, okHttpClient).apply {
            onOpen { Log.i(TAG, "Socket opened.") }
            onClose { Log.w(TAG, "Socket closed.") }
            onError { t, _ -> Log.e(TAG, "Socket error", t) }
        }

        val topic = "session:$sessionId"
        channel = socket?.channel(topic, mapOf("client" to "Android"))?.apply {
            join()
                .receive("ok") { message ->
                    scope.launch(Dispatchers.Main) {
                        // 在join时收到初始状态
                        // Receive initial state on join
                        val initialStateJson = message.payload.get("state").toString()
                        _sessionState.value = mapper.readValue(initialStateJson)
                        Log.i(TAG, "Joined channel successfully. Initial state received.")
                    }
                }
                .receive("error") {
                    Log.e(TAG, "Failed to join channel: ${it.payload}")
                }
                .receive("timeout") {
                    Log.e(TAG, "Join timeout.")
                }

            // 监听来自服务器的状态更新广播
            // Listen for state update broadcasts from the server
            on("state_updated") { message ->
                scope.launch(Dispatchers.Main) {
                    val stateJson = message.payload.get("state").toString()
                    _sessionState.value = mapper.readValue(stateJson)
                    Log.d(TAG, "State updated from broadcast: ${_sessionState.value}")
                }
            }
        }
        
        socket?.connect()
    }

    fun updateValue(newValue: Int) {
        if (channel?.isJoined != true) {
            Log.e(TAG, "Cannot update state, channel not joined.")
            return
        }
        
        scope.launch(Dispatchers.IO) {
            // 在真实项目中,更新操作应有更复杂的状态和确认机制
            // In a real project, updates should have more complex state and ack mechanisms.
            val payload = mapOf(
                "value" to newValue,
                "last_updated_by" to "AndroidClient"
            )
            channel?.push("update_state", payload)
        }
    }

    fun disconnect() {
        channel?.leave()
        socket?.disconnect()
        socket = null
        channel = null
        Log.i(TAG, "Disconnected.")
    }
}

3. ViewModel与Compose UI

SessionViewModel持有Repository,并向UI暴露StateFlow

SessionViewModel.kt

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope

class SessionViewModel : ViewModel() {
    private val repository = SessionRepository(viewModelScope)
    val sessionState = repository.sessionState

    fun connectToSession(sessionId: String) {
        repository.connect(sessionId)
    }
    
    fun incrementValue() {
        val currentValue = sessionState.value.value
        repository.updateValue(currentValue + 1)
    }

    override fun onCleared() {
        super.onCleared()
        repository.disconnect()
    }
}

UI层只需观察这个StateFlow即可。

SessionScreen.kt

import androidx.compose.foundation.layout.*
import androidx.compose.material3.*
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.unit.dp
import androidx.lifecycle.viewmodel.compose.viewModel

@Composable
fun SessionScreen(viewModel: SessionViewModel = viewModel()) {
    val state by viewModel.sessionState.collectAsState()
    
    LaunchedEffect(Unit) {
        // 连接到特定会话
        viewModel.connectToSession("dashboard_123")
    }

    Column(
        modifier = Modifier.fillMaxSize().padding(16.dp),
        horizontalAlignment = Alignment.CenterHorizontally,
        verticalArrangement = Arrangement.Center
    ) {
        Text(text = state.title, style = MaterialTheme.typography.headlineMedium)
        Spacer(modifier = Modifier.height(24.dp))
        
        Text(text = "Current Value: ${state.value}", style = MaterialTheme.typography.titleLarge)
        Spacer(modifier = Modifier.height(8.dp))
        
        Text(
            text = "Last updated by: ${state.lastUpdatedBy ?: "N/A"}",
            style = MaterialTheme.typography.bodySmall
        )
        Spacer(modifier = Modifier.height(32.dp))
        
        Button(onClick = { viewModel.incrementValue() }) {
            Text("Increment from Android")
        }
    }
}

现在,Android应用已经可以连接到我们的Elixir后端,实时接收和发送状态更新。

第三步:PWA客户端集成

Web端的集成相对简单,因为Phoenix官方提供了phoenix.js库。

assets/js/app.js

import { Socket } from "phoenix";

class SessionConnector {
    constructor(sessionId, stateUpdateCallback) {
        this.sessionId = sessionId;
        this.stateUpdateCallback = stateUpdateCallback;
        this.socket = new Socket("/socket", { params: { token: window.userToken } });
        this.channel = null;
    }

    connect() {
        this.socket.connect();
        const topic = `session:${this.sessionId}`;
        this.channel = this.socket.channel(topic, { client: "PWA" });

        this.channel.join()
            .receive("ok", resp => {
                console.log("Joined PWA to session successfully", resp);
                // 收到初始状态
                this.stateUpdateCallback(resp.state);
            })
            .receive("error", resp => {
                console.error("Unable to join PWA to session", resp);
                // 这里的坑在于: 必须处理好重连逻辑,否则网络抖动会导致状态永久丢失
                // A common pitfall: reconnection logic must be handled properly, 
                // otherwise network glitches can lead to permanent state loss.
                setTimeout(() => this.connect(), 5000);
            });

        // 监听状态更新
        this.channel.on("state_updated", payload => {
            console.log("Received state update:", payload.state);
            this.stateUpdateCallback(payload.state);
        });
    }

    updateTitle(newTitle) {
        if (!this.channel) {
            console.error("Cannot update, channel is not initialized.");
            return;
        }
        this.channel.push("update_state", {
            title: newTitle,
            last_updated_by: "PWAClient"
        })
        .receive("error", e => console.error("Update failed", e));
    }
}

// UI 绑定逻辑
document.addEventListener('DOMContentLoaded', () => {
    const titleEl = document.getElementById('session-title');
    const valueEl = document.getElementById('session-value');
    const updatedByEl = document.getElementById('session-updated-by');
    const updateBtn = document.getElementById('update-title-btn');

    const updateUI = (state) => {
        titleEl.textContent = state.title;
        valueEl.textContent = `Current Value: ${state.value}`;
        updatedByEl.textContent = `Last updated by: ${state.last_updated_by || 'N/A'}`;
    };

    const connector = new SessionConnector("dashboard_123", updateUI);
    connector.connect();

    updateBtn.addEventListener('click', () => {
        const newTitle = `PWA Title @ ${new Date().toLocaleTimeString()}`;
        connector.updateTitle(newTitle);
    });
});

现在,在Android App上点击按钮增加数值,PWA界面会立即响应。反之,在PWA上更新标题,Android App的UI也会同步变化。

第四步:移动端CI/CD流水线

为原生应用建立可靠的CI/CD流水线至关重要。它不仅要能构建应用,还应该能运行测试,确保我们的实时同步逻辑没有被破坏。我们使用GitHub Actions来演示这个过程。

.github/workflows/android_ci.yml

name: Android CI/CD

on:
  push:
    branches: [ "main" ]
  pull_request:
    branches: [ "main" ]

jobs:
  build_and_test:
    runs-on: ubuntu-latest
    
    steps:
    - name: Checkout repository
      uses: actions/checkout@v3

    - name: Set up JDK 17
      uses: actions/setup-java@v3
      with:
        java-version: '17'
        distribution: 'temurin'
        cache: 'gradle'

    - name: Grant execute permission for gradlew
      run: chmod +x ./gradlew

    - name: Run unit tests
      run: ./gradlew testDebugUnitTest
      # 单元测试的局限性在于无法验证与真实后端的连接。
      # Limitation of unit tests: they can't verify connection to a real backend.
      # 需要模拟WebSocket服务器,但这很复杂且可能不准确。
      # Mocking a WebSocket server is complex and can be inaccurate.

    - name: Build debug APK
      run: ./gradlew assembleDebug

    - name: Upload debug APK
      uses: actions/upload-artifact@v3
      with:
        name: app-debug
        path: app/build/outputs/apk/debug/app-debug.apk

  # 一个更完整的流水线应包含 E2E 测试
  # A more complete pipeline should include E2E tests.
  # e2e_test:
  #   needs: build_and_test
  #   runs-on: macos-latest # For Android emulator
  #   steps:
  #   - ...
  #   - name: Start Elixir backend in a service container
  #   - name: Run Android Instrumented Tests
  #     uses: reactivecircus/android-emulator-runner@v2
  #     with:
  #       api-level: 29
  #       script: ./gradlew connectedCheck

这个流水线覆盖了基础的构建和单元测试。一个真正的生产级流水线会更加复杂,它会包含一个e2e_test作业:

  1. 在服务容器中启动我们的Elixir/Phoenix应用。
  2. 使用android-emulator-runner等工具启动一个Android模拟器。
  3. 运行Android的仪器测试(Instrumented Tests),这些测试可以直接与容器化的后端进行交互,验证端到端的实时同步功能是否正常工作。这是保证系统韧性的关键一环。

局限性与未来迭代路径

当前这套架构虽然能够良好地工作,但也存在一些明确的局限性:

  1. GenServer状态持久化:目前GenServer的状态完全存在于内存中。如果一个进程崩溃或服务器重启,状态就会丢失。在需要持久化的场景中,应引入ETS(Erlang Term Storage)或一个外部数据库(如PostgreSQL或Redis),并在GenServer启动时加载状态,更新时写入存储。
  2. 集群扩展:当单机无法承载所有GenServer进程时,需要将Elixir应用扩展到多节点集群。这会引入Horde等库来在集群中管理和分发GenServer,确保任何节点上的客户端都能找到正确的状态进程。
  3. 客户端离线处理:目前的实现在客户端离线后无法保证状态一致。重新连接后,客户端只是获取了最新状态。对于需要处理离线编辑和冲突合并的复杂应用(如协同文档),需要引入CRDTs(Conflict-free Replicated Data Types)或操作转换(Operational Transformation)算法。
  4. CI/CD的成熟度:流水线中演示的端到端测试是概念性的。在生产中搭建一套稳定、高效,且包含真实后端和模拟器的端到端测试环境,是一项独立的、具有挑战性的工程任务。

  目录