我们面临一个棘手的工程问题:一个核心业务需要同时在高性能的原生Android应用和轻量级的Web端(PWA)上提供服务。关键挑战在于,当用户在一个客户端上执行操作时,状态必须实时、可靠地同步到所有其他客户端,包括用户自己打开的多个设备。例如,在一个共享仪表盘上调整一个参数,所有观察者,无论是在原生App还是PWA上,都应立即看到变化。
传统的HTTP轮询方案在这种场景下是完全不可接受的。它会带来巨大的服务器压力和不可避免的延迟。各自为政的API和数据库后端则会导致数据一致性噩梦和双倍的维护成本。我们需要一个统一的、高效的实时通信层,它必须能同时服务于JVM生态的Kotlin和浏览器生态的JavaScript。
初步构想是建立一个基于WebSocket的发布/订阅(Pub/Sub)服务。经过技术选型,Elixir及其Phoenix框架成为最终选择。这并非偶然,BEAM虚拟机天生的并发能力、轻量级进程模型以及对长连接的卓越支持,使其成为构建此类系统的理想平台。Phoenix Channels更是对WebSocket进行了优雅封装,提供了开箱即用的主题订阅、广播和状态管理能力。
本文将记录从后端服务构建、原生客户端集成、PWA客户端集成,到最终通过CI/CD流水线自动化构建和验证整个异构系统的全过程。这不是一篇入门教程,而是对一个生产级实时同步层实现细节的复盘。
第一步:Elixir与Phoenix后端,状态管理的基石
核心思路是为每一个需要同步的“会话”或“资源”(例如一个共享仪表盘dashboard:123)在后端启动一个独立的进程来管理其状态。Elixir的GenServer是实现这一点的完美工具。它是一个受监督的、可串行处理消息的状态机。
我们将创建一个SessionServer来管理特定会话的状态。Phoenix Channel则作为客户端与SessionServer交互的网关。
graph TD
subgraph "Android App (Jetpack Compose)"
A[OkHttp WebSocket] --> B{Phoenix Channel Client}
end
subgraph "Web App (PWA)"
C[phoenix.js WebSocket] --> D{Phoenix Channel Client}
end
subgraph "Elixir/Phoenix Backend"
E[Phoenix Channel Endpoint]
F(SessionChannel)
G((GenServer per Session))
H(PubSub)
end
B --> E
D --> E
E --> F
F -- interacts with --> G
G -- broadcasts via --> H
H -- pushes to --> F
F -- pushes to --> B
F -- pushes to --> D
1. 状态管理核心:SessionServer
这个GenServer负责持有并更新某个特定会话的状态。当状态改变时,它通过Phoenix的PubSub机制广播更新。
lib/live_sync/session_server.go
defmodule LiveSync.SessionServer do
use GenServer
require Logger
# Public API
def start_link(session_id) do
GenServer.start_link(__MODULE__, %{session_id: session_id, state: initial_state()}, name: via_tuple(session_id))
end
def get_state(session_id) do
GenServer.call(via_tuple(session_id), :get_state)
end
def update_state(session_id, new_partial_state) do
GenServer.cast(via_tuple(session_id), {:update_state, new_partial_state})
end
# GenServer Callbacks
@impl true
def init(args) do
Logger.info("SessionServer for #{args.session_id} started.")
{:ok, args}
end
@impl true
def handle_call(:get_state, _from, state) do
{:reply, state.state, state}
end
@impl true
def handle_cast({:update_state, partial_state}, state) do
# 在真实项目中,这里会有复杂的合并逻辑和验证
# For a production scenario, complex merge logic and validation would live here.
new_state_map = Map.merge(state.state, partial_state)
# 广播状态变更
# Broadcast the state change through Phoenix PubSub
topic = "session:#{state.session_id}"
LiveSyncWeb.Endpoint.broadcast!(topic, "state_updated", %{state: new_state_map})
Logger.debug("State updated for #{state.session_id}: #{inspect(new_state_map)}")
{:noreply, %{state | state: new_state_map}}
end
@impl true
def terminate(reason, state) do
Logger.warn("SessionServer for #{state.session_id} terminating. Reason: #{inspect(reason)}")
:ok
end
# Private helpers
defp via_tuple(session_id), do: {:via, Registry, {LiveSync.SessionRegistry, session_id}}
defp initial_state, do: %{
"title" => "Untitled Dashboard",
"value" => 0,
"last_updated_by" => nil
}
end
我们使用Registry来动态地管理这些GenServer进程,允许我们通过session_id来查找它们。
2. 通信网关:SessionChannel
这个Channel处理客户端的加入、离开以及事件推送。
lib/live_sync_web/channels/session_channel.ex
defmodule LiveSyncWeb.SessionChannel do
use Phoenix.Channel
require Logger
alias LiveSync.SessionServer
def join("session:" <> session_id, _payload, socket) do
# 在真实项目中,这里必须有认证和授权逻辑
# In a real project, authentication and authorization logic are mandatory here.
# 确保对应的GenServer正在运行
# Ensure the corresponding GenServer is running
case Process.whereis(via_tuple(session_id)) do
nil ->
{:ok, _pid} = LiveSync.SessionServer.start_link(session_id)
_pid ->
:ok
end
# 订阅此会话的PubSub主题
# Subscribe to the PubSub topic for this session
LiveSyncWeb.Endpoint.subscribe("session:#{session_id}")
# 获取当前状态并发送给新加入的客户端
# Get the current state and send it to the newly joined client
current_state = SessionServer.get_state(session_id)
Logger.info("Client #{socket.assigns.user_id} joined session:#{session_id}")
{:ok, %{state: current_state}, assign(socket, :session_id, session_id)}
end
# 客户端推送更新事件
# Client pushes an update event
@impl true
def handle_in("update_state", payload, socket) do
session_id = socket.assigns.session_id
# 在此添加 payload 的验证逻辑
# Add payload validation logic here.
# 将更新请求转发给GenServer
# Forward the update request to the GenServer
SessionServer.update_state(session_id, payload)
{:noreply, socket}
end
# 服务器广播的消息不会经过 handle_out,直接发给客户端
# Messages broadcast from the server don't go through handle_out, they are sent directly.
@impl true
def terminate(reason, socket) do
Logger.warn("Channel for session #{socket.assigns.session_id} terminated. Reason: #{inspect(reason)}")
:ok
end
defp via_tuple(session_id), do: {:via, Registry, {LiveSync.SessionRegistry, session_id}}
end
至此,后端的核心逻辑已经完成。它能够为每个会话动态创建状态进程,并通过Channel安全地暴露接口。
第二步:Jetpack Compose原生客户端集成
Android端集成的难点在于找到一个稳定、支持协程的Phoenix Channel客户端。我们选择了一个社区维护的库,并对其进行封装以适应我们的MVVM架构。
1. 依赖与WebSocket客户端设置
我们需要一个WebSocket客户端(如OkHttp)和一个Phoenix Channel协议的实现库。这里我们使用phoenix-channels-client-kotlin。
build.gradle.kts
dependencies {
implementation("com.squareup.okhttp3:okhttp3:4.11.0")
implementation("com.github.dsrees:phoenix-channels-client-kotlin:2.0.0")
// Coroutines & ViewModel
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:2.6.2")
}
2. 封装的SessionRepository
为了将通信逻辑与UI解耦,我们创建一个Repository来管理Channel的连接和状态流。
SessionRepository.kt
import android.util.Log
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import com.github.dsrees.phoenix.Channel
import com.github.dsrees.phoenix.Socket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import java.util.concurrent.TimeUnit
data class SessionState(
val title: String = "Loading...",
val value: Int = 0,
val lastUpdatedBy: String? = null
)
class SessionRepository(private val scope: CoroutineScope) {
private val _sessionState = MutableStateFlow(SessionState())
val sessionState = _sessionState.asStateFlow()
private var socket: Socket? = null
private var channel: Channel? = null
private val mapper = jacksonObjectMapper()
companion object {
private const val SOCKET_URL = "ws://10.0.2.2:4000/socket/websocket" // 10.0.2.2 is the host machine for Android Emulator
private const val TAG = "SessionRepository"
}
fun connect(sessionId: String) {
if (socket?.isConnected == true) {
Log.w(TAG, "Already connected.")
return
}
val okHttpClient = OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.SECONDS)
.build()
socket = Socket(SOCKET_URL, okHttpClient).apply {
onOpen { Log.i(TAG, "Socket opened.") }
onClose { Log.w(TAG, "Socket closed.") }
onError { t, _ -> Log.e(TAG, "Socket error", t) }
}
val topic = "session:$sessionId"
channel = socket?.channel(topic, mapOf("client" to "Android"))?.apply {
join()
.receive("ok") { message ->
scope.launch(Dispatchers.Main) {
// 在join时收到初始状态
// Receive initial state on join
val initialStateJson = message.payload.get("state").toString()
_sessionState.value = mapper.readValue(initialStateJson)
Log.i(TAG, "Joined channel successfully. Initial state received.")
}
}
.receive("error") {
Log.e(TAG, "Failed to join channel: ${it.payload}")
}
.receive("timeout") {
Log.e(TAG, "Join timeout.")
}
// 监听来自服务器的状态更新广播
// Listen for state update broadcasts from the server
on("state_updated") { message ->
scope.launch(Dispatchers.Main) {
val stateJson = message.payload.get("state").toString()
_sessionState.value = mapper.readValue(stateJson)
Log.d(TAG, "State updated from broadcast: ${_sessionState.value}")
}
}
}
socket?.connect()
}
fun updateValue(newValue: Int) {
if (channel?.isJoined != true) {
Log.e(TAG, "Cannot update state, channel not joined.")
return
}
scope.launch(Dispatchers.IO) {
// 在真实项目中,更新操作应有更复杂的状态和确认机制
// In a real project, updates should have more complex state and ack mechanisms.
val payload = mapOf(
"value" to newValue,
"last_updated_by" to "AndroidClient"
)
channel?.push("update_state", payload)
}
}
fun disconnect() {
channel?.leave()
socket?.disconnect()
socket = null
channel = null
Log.i(TAG, "Disconnected.")
}
}
3. ViewModel与Compose UI
SessionViewModel持有Repository,并向UI暴露StateFlow。
SessionViewModel.kt
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
class SessionViewModel : ViewModel() {
private val repository = SessionRepository(viewModelScope)
val sessionState = repository.sessionState
fun connectToSession(sessionId: String) {
repository.connect(sessionId)
}
fun incrementValue() {
val currentValue = sessionState.value.value
repository.updateValue(currentValue + 1)
}
override fun onCleared() {
super.onCleared()
repository.disconnect()
}
}
UI层只需观察这个StateFlow即可。
SessionScreen.kt
import androidx.compose.foundation.layout.*
import androidx.compose.material3.*
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.unit.dp
import androidx.lifecycle.viewmodel.compose.viewModel
@Composable
fun SessionScreen(viewModel: SessionViewModel = viewModel()) {
val state by viewModel.sessionState.collectAsState()
LaunchedEffect(Unit) {
// 连接到特定会话
viewModel.connectToSession("dashboard_123")
}
Column(
modifier = Modifier.fillMaxSize().padding(16.dp),
horizontalAlignment = Alignment.CenterHorizontally,
verticalArrangement = Arrangement.Center
) {
Text(text = state.title, style = MaterialTheme.typography.headlineMedium)
Spacer(modifier = Modifier.height(24.dp))
Text(text = "Current Value: ${state.value}", style = MaterialTheme.typography.titleLarge)
Spacer(modifier = Modifier.height(8.dp))
Text(
text = "Last updated by: ${state.lastUpdatedBy ?: "N/A"}",
style = MaterialTheme.typography.bodySmall
)
Spacer(modifier = Modifier.height(32.dp))
Button(onClick = { viewModel.incrementValue() }) {
Text("Increment from Android")
}
}
}
现在,Android应用已经可以连接到我们的Elixir后端,实时接收和发送状态更新。
第三步:PWA客户端集成
Web端的集成相对简单,因为Phoenix官方提供了phoenix.js库。
assets/js/app.js
import { Socket } from "phoenix";
class SessionConnector {
constructor(sessionId, stateUpdateCallback) {
this.sessionId = sessionId;
this.stateUpdateCallback = stateUpdateCallback;
this.socket = new Socket("/socket", { params: { token: window.userToken } });
this.channel = null;
}
connect() {
this.socket.connect();
const topic = `session:${this.sessionId}`;
this.channel = this.socket.channel(topic, { client: "PWA" });
this.channel.join()
.receive("ok", resp => {
console.log("Joined PWA to session successfully", resp);
// 收到初始状态
this.stateUpdateCallback(resp.state);
})
.receive("error", resp => {
console.error("Unable to join PWA to session", resp);
// 这里的坑在于: 必须处理好重连逻辑,否则网络抖动会导致状态永久丢失
// A common pitfall: reconnection logic must be handled properly,
// otherwise network glitches can lead to permanent state loss.
setTimeout(() => this.connect(), 5000);
});
// 监听状态更新
this.channel.on("state_updated", payload => {
console.log("Received state update:", payload.state);
this.stateUpdateCallback(payload.state);
});
}
updateTitle(newTitle) {
if (!this.channel) {
console.error("Cannot update, channel is not initialized.");
return;
}
this.channel.push("update_state", {
title: newTitle,
last_updated_by: "PWAClient"
})
.receive("error", e => console.error("Update failed", e));
}
}
// UI 绑定逻辑
document.addEventListener('DOMContentLoaded', () => {
const titleEl = document.getElementById('session-title');
const valueEl = document.getElementById('session-value');
const updatedByEl = document.getElementById('session-updated-by');
const updateBtn = document.getElementById('update-title-btn');
const updateUI = (state) => {
titleEl.textContent = state.title;
valueEl.textContent = `Current Value: ${state.value}`;
updatedByEl.textContent = `Last updated by: ${state.last_updated_by || 'N/A'}`;
};
const connector = new SessionConnector("dashboard_123", updateUI);
connector.connect();
updateBtn.addEventListener('click', () => {
const newTitle = `PWA Title @ ${new Date().toLocaleTimeString()}`;
connector.updateTitle(newTitle);
});
});
现在,在Android App上点击按钮增加数值,PWA界面会立即响应。反之,在PWA上更新标题,Android App的UI也会同步变化。
第四步:移动端CI/CD流水线
为原生应用建立可靠的CI/CD流水线至关重要。它不仅要能构建应用,还应该能运行测试,确保我们的实时同步逻辑没有被破坏。我们使用GitHub Actions来演示这个过程。
.github/workflows/android_ci.yml
name: Android CI/CD
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
jobs:
build_and_test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
cache: 'gradle'
- name: Grant execute permission for gradlew
run: chmod +x ./gradlew
- name: Run unit tests
run: ./gradlew testDebugUnitTest
# 单元测试的局限性在于无法验证与真实后端的连接。
# Limitation of unit tests: they can't verify connection to a real backend.
# 需要模拟WebSocket服务器,但这很复杂且可能不准确。
# Mocking a WebSocket server is complex and can be inaccurate.
- name: Build debug APK
run: ./gradlew assembleDebug
- name: Upload debug APK
uses: actions/upload-artifact@v3
with:
name: app-debug
path: app/build/outputs/apk/debug/app-debug.apk
# 一个更完整的流水线应包含 E2E 测试
# A more complete pipeline should include E2E tests.
# e2e_test:
# needs: build_and_test
# runs-on: macos-latest # For Android emulator
# steps:
# - ...
# - name: Start Elixir backend in a service container
# - name: Run Android Instrumented Tests
# uses: reactivecircus/android-emulator-runner@v2
# with:
# api-level: 29
# script: ./gradlew connectedCheck
这个流水线覆盖了基础的构建和单元测试。一个真正的生产级流水线会更加复杂,它会包含一个e2e_test作业:
- 在服务容器中启动我们的Elixir/Phoenix应用。
- 使用
android-emulator-runner等工具启动一个Android模拟器。 - 运行Android的仪器测试(Instrumented Tests),这些测试可以直接与容器化的后端进行交互,验证端到端的实时同步功能是否正常工作。这是保证系统韧性的关键一环。
局限性与未来迭代路径
当前这套架构虽然能够良好地工作,但也存在一些明确的局限性:
-
GenServer状态持久化:目前GenServer的状态完全存在于内存中。如果一个进程崩溃或服务器重启,状态就会丢失。在需要持久化的场景中,应引入ETS(Erlang Term Storage)或一个外部数据库(如PostgreSQL或Redis),并在GenServer启动时加载状态,更新时写入存储。 - 集群扩展:当单机无法承载所有
GenServer进程时,需要将Elixir应用扩展到多节点集群。这会引入Horde等库来在集群中管理和分发GenServer,确保任何节点上的客户端都能找到正确的状态进程。 - 客户端离线处理:目前的实现在客户端离线后无法保证状态一致。重新连接后,客户端只是获取了最新状态。对于需要处理离线编辑和冲突合并的复杂应用(如协同文档),需要引入CRDTs(Conflict-free Replicated Data Types)或操作转换(Operational Transformation)算法。
- CI/CD的成熟度:流水线中演示的端到端测试是概念性的。在生产中搭建一套稳定、高效,且包含真实后端和模拟器的端到端测试环境,是一项独立的、具有挑战性的工程任务。