2009年12月22日 星期二

多 Local Connection 彼此維護名單的機制

有時候我們作 Local Connection 的應用時,都需要一個 "主 Flash" 負責管理有多少個 Flash 端點,我這裡說的不是只有兩個 Flash 在互相連線而已,而是多個 Flash 需要互相連線、交換資訊的情況。

這個情況可以想像是 "遊戲大廳" 與 "各遊戲" 之間的關係,不管各遊戲開開關關,只要遊戲大廳還存在,就可以負責為各遊戲之間作資料交換的工作。但假使遊戲大廳不存在呢?那麼各遊戲就都必須充當遊戲大廳的角色去認識所有其他遊戲才行。

這個機制現在的雛形,可作幾件事情:

* 新開啟的遊戲(Flash 應用程式),會先尋找(有限數量)是否有已經開啟的任一遊戲;
若沒找到,自己就成為第一個遊戲;
若找到,就請該遊戲提供 "現有遊戲的連線表"、"可用連線連線名稱" 給自己,自己使用 "可用連線連線名稱" 後,就廣播通知 "現有遊戲的連線表" 中的所有遊戲。

* 當某個 遊戲 a 要向 遊戲 b 傳遞訊息發生錯誤時,遊戲 a 會將自己擁有的連線表中去除 遊戲 b,並廣播告知 遊戲 c、d、e 更新名單。



現行機制是每個遊戲端點都存有名單,並且名單異動都要通知大家。原本我有考慮另一種作法,在這個網絡中只有一個 "主要端點" 來維護名單,如果某端點有需要作 "廣播" 的動作時,就必須通知 主要端點 來作。這種作法的步驟大致是:

* 新端點開啟時,必須先尋找任一存在端點,若找不到,自己成為第一端點,也是主要端點。若找到,則該端點會回應告知 誰才是主要端點,然後此新端點 再去向主要端點作註冊的動作。

* 當任一端點 a 斷線時,其它端點連不到他,就只是單純送出相關事件;若主要端點連不到 a,就會將 a 從名單去除,之後的廣播就不會送給 a。

* 當主要端點斷線時,若其他端點欲連主要端點時發現斷線,則第一個發現斷線的端點將成為新的主要端點,此時必須搜尋網絡中所有存在的端點,並在自己體內建立新的連線表,然後廣播告知大家誰是新的主要端點。

大致這樣,不過後來因為實作上很亂,所以才改成現在的版本,所有端點都存名單。



以下是完整程式碼:


package
LocalP2PMain.as -- 測試程式


package {
import flash.display.SimpleButton;
import flash.display.Sprite;
import flash.events.ErrorEvent;
import flash.events.MouseEvent;
import flash.text.TextField;
import flash.text.TextFieldAutoSize;
import flash.text.TextFieldType;

import idv.ben.lp2p.LocalP2P;
import idv.ben.lp2p.events.ConnectEvent;
import idv.ben.lp2p.events.LocalP2PEvent;
import idv.ben.lp2p.events.ReceiveMessageEvent;

[SWF(width="640", height="480")]
public class LocalP2PMain extends Sprite
{
static private var CMD_ID_1:int = LocalP2P.getNextCommandID();
static private var CMD_ID_2:int = LocalP2P.getNextCommandID();

private var _msg_tf:TextField;

private var _txt1_tf:TextField;
private var _send1_sb:SimpleButton;

private var _lp2p:LocalP2P;

public function LocalP2PMain()
{
init();
}

private function init():void{
initUI();

initLP2P();
}

private function initUI():void{
_msg_tf = new TextField();
_msg_tf.x = 10;
_msg_tf.y = 10;
_msg_tf.width = 620;
_msg_tf.height = 400;
_msg_tf.type = TextFieldType.INPUT;
_msg_tf.background = true;
_msg_tf.wordWrap = true;
addChild(_msg_tf);

_txt1_tf = new TextField();
_txt1_tf.x = _msg_tf.x;
_txt1_tf.y = _msg_tf.y + _msg_tf.height + 10;
_txt1_tf.width = 120;
_txt1_tf.height = 20;
_txt1_tf.text = "{receiver conn name}";
_txt1_tf.type = TextFieldType.INPUT;
_txt1_tf.background = true;
addChild(_txt1_tf);

_send1_sb = createSimpleButton("Send1");
_send1_sb.x = _txt1_tf.x + _txt1_tf.width + 10;
_send1_sb.y = _txt1_tf.y;
addChild(_send1_sb);
}

private function createSimpleButton(label:String):SimpleButton{
var state:Sprite = new Sprite();
state.graphics.beginFill(0xffcc00);
state.graphics.drawRect(0, 0, 50, 20);
state.graphics.endFill();

var txt:TextField = new TextField();
txt.text = label;
txt.autoSize = TextFieldAutoSize.LEFT;
state.addChild(txt);

var sb:SimpleButton = new SimpleButton(state, state, state, state);
return sb;
}

private function initLP2P():void{
_lp2p = new LocalP2P();
_lp2p.addEventListener(LocalP2PEvent.CONN_FULL, onLP2P_connFull);
_lp2p.addEventListener(LocalP2PEvent.READY, onLP2P_ready);
_lp2p.addEventListener(LocalP2PEvent.MAP_UPDATE, onLP2P_mapUpdate);
_lp2p.addEventListener(ErrorEvent.ERROR, onLP2P_error);
_lp2p.addEventListener(ConnectEvent.DISCONNECT, onLP2P_disconnect);
_lp2p.addEventListener(ReceiveMessageEvent.RECEIVE, onLP2P_receiveMessage);
_lp2p.connect();
}

private function onLP2P_connFull(e:LocalP2PEvent):void{
addMsg("onLP2P_connFull()");
}

private function onLP2P_ready(e:LocalP2PEvent):void{
addMsg("onLP2P_ready()");

initUIinteractive();
}

private function onLP2P_mapUpdate(e:LocalP2PEvent):void{
addMsg("onLP2P_mapUpdate()");

var map:Array = _lp2p.getAllLCNames();
for(var i:int=0; i < map.length; i++){
addMsg(i + ", " + map[i]);
}
}

private function onLP2P_error(e:ErrorEvent):void{
addMsg("onLP2P_error(), " + e);
}

private function onLP2P_disconnect(e:ConnectEvent):void{
addMsg("連線失敗: " + e.connName);
}

private function onLP2P_receiveMessage(e:ReceiveMessageEvent):void{
addMsg("onLP2P_receiveMessage(), sender=" + e.sender + ", cmd=" + e.cmd + ", args=" + e.args);
}

private function addMsg(msg:String):void{
var dt:Date = new Date();
var strDT:String = "[" + dt.hours + ":" + dt.minutes + ":" + dt.seconds + "." + dt.milliseconds + "] - ";

_msg_tf.appendText(strDT + msg + "\n");
_msg_tf.scrollV = _msg_tf.maxScrollV;
}

private function initUIinteractive():void{
_send1_sb.addEventListener(MouseEvent.CLICK, onSend1Click);
}

private function onSend1Click(e:MouseEvent):void{
addMsg("onSend1Click(), " + _txt1_tf.text + ", " + CMD_ID_1 + ", ABC, 123");
_lp2p.send(_txt1_tf.text, CMD_ID_1, "ABC", "123");
}
}
}


package idv.ben.lp2p
LocaclP2P.as -- 主要邏輯


package idv.ben.lp2p
{
import flash.events.AsyncErrorEvent;
import flash.events.ErrorEvent;
import flash.events.EventDispatcher;
import flash.events.SecurityErrorEvent;
import flash.events.StatusEvent;
import flash.net.LocalConnection;

import idv.ben.lp2p.events.AnyOnePeerFinderEvent;
import idv.ben.lp2p.events.ConnectEvent;
import idv.ben.lp2p.events.LocalP2PEvent;
import idv.ben.lp2p.events.ReceiveMessageEvent;
import idv.ben.lp2p.utils.ParamSerialization;

[Event(name="connFull", type="idv.ben.lp2p.events.LocalP2PEvent")]
[Event(name="ready", type="idv.ben.lp2p.events.LocalP2PEvent")]
[Event(name="mapUpdate", type="idv.ben.lp2p.events.LocalP2PEvent")]
[Event(name="disconnect", type="idv.ben.lp2p.events.ConnectEvent")]
[Event(name="receive", type="idv.ben.lp2p.events.ReceiveMessageEvent")]
public class LocalP2P extends EventDispatcher
{
static public const MAX_CONN:int = 10;
static public const CONN_NAME_SURFIX:String = "LocalP2P_ConnName_";
static public const RECEIVE_METHOD_NAME:String = "receiveMsg";

static private var CMD_ID:int = 0;
static public function getNextCommandID():int{
return ++CMD_ID;
}
static public const CMD_FIND_ANYPEER_REQ:int = getNextCommandID();
static public const CMD_FIND_ANYPEER_RESP:int = getNextCommandID();
static public const CMD_GET_AVAIBLE_CONNNAME_REQ:int = getNextCommandID();
static public const CMD_GET_AVAIBLE_CONNNAME_RESP:int = getNextCommandID();
static public const CMD_UPDATE_MAP:int = getNextCommandID();

static private var _instance:LocalP2P;
static public function getInstance():LocalP2P{
if(_instance==null)
_instance = new LocalP2P();
return _instance;
}

private var _connName:String;
private var _localLC:LocalConnection;

//每送給不同 conn name 的時候,都建立一個新的 LC
private var _remoteLCs:Object = new Object();

private var _allLCNames:Array = new Array();
public function getAllLCNames():Array{
return _allLCNames.concat(); //提供複製本
}

public function LocalP2P()
{
if(_instance==null){
_instance = this;
}else{
throw new Error("plz use getInstance()");
}
}

public function connect():void{
trace(this, "connect()");

_connName = "Random_ConnName_LocalP2P_" + (new Date()).getTime();

_localLC = new LocalConnection();
_localLC.client = this;
_localLC.connect(_connName);

//找任一個端點
var finder:AnyOnePeerFinder = new AnyOnePeerFinder();
finder.addEventListener(AnyOnePeerFinderEvent.FOUND, onConnect_foundPeer);
finder.addEventListener(AnyOnePeerFinderEvent.NOT_FOUND, onConnect_notFoundPeer);
finder.find();
}

private function onConnect_foundPeer(e:AnyOnePeerFinderEvent):void{
trace(this, "onConnect_foundPeer()");

//取得可用端點名稱
send(e.connName, CMD_GET_AVAIBLE_CONNNAME_REQ);
}

private function onConnect_notFoundPeer(e:AnyOnePeerFinderEvent):void{
trace(this, "onConnect_notFoundPeer()");

initLC(CONN_NAME_SURFIX + 1); //第一個端點
}

private function initLC(connName:String):void{
trace(this, "initLC()", connName);

_connName = connName;

_localLC = new LocalConnection();
_localLC.client = this;
_localLC.addEventListener(AsyncErrorEvent.ASYNC_ERROR, onLocalLCError);
_localLC.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onLocalLCError);
_localLC.connect(_connName);
dispatchEvent(new LocalP2PEvent(LocalP2PEvent.READY));

_allLCNames.push(_connName);
dispatchEvent(new LocalP2PEvent(LocalP2PEvent.MAP_UPDATE));
}

private function onLocalLCError(e:ErrorEvent):void{
trace(this, "onLocalLCError()", e);
dispatchEvent(e);
}

public function send(receiver:String, cmd:int, ...args):void{
trace(this, "send()receiver=" + receiver + ", cmd=" + cmd + ", args=" + args);

if(args!=null)
args = ParamSerialization.serialization(args);

var remoteLC:LocalConnection;
if(receiver=="*"){
for(var i:int=0; i < _allLCNames.length; i++){
if(_allLCNames[i] != _connName){
remoteLC = new LocalConnection();
remoteLC.addEventListener(StatusEvent.STATUS, onStatus);
_remoteLCs[_allLCNames[i]] = remoteLC;
remoteLC.send(_allLCNames[i], RECEIVE_METHOD_NAME
, _connName, cmd, args);
}
}
}else{
remoteLC = new LocalConnection();
remoteLC.addEventListener(StatusEvent.STATUS, onStatus);
_remoteLCs[receiver] = remoteLC;
remoteLC.send(receiver, RECEIVE_METHOD_NAME
, _connName, cmd, args);
}
}

private function onStatus(e:StatusEvent):void{
trace(this, "onStatus()", e);

if(e.level == "error"){

var errConnName:String;

for(var connName:String in _remoteLCs){
if(_remoteLCs[connName] == e.currentTarget){

errConnName = connName;

//連線失敗
var evt:ConnectEvent = new ConnectEvent(ConnectEvent.DISCONNECT);
evt.connName = errConnName;
dispatchEvent(evt);

var idx:int = _allLCNames.indexOf(errConnName);
if(idx!=-1){
_allLCNames.splice(idx, 1);
dispatchEvent(new LocalP2PEvent(LocalP2PEvent.MAP_UPDATE));

//廣播,更新連線表
send("*", CMD_UPDATE_MAP, _allLCNames);

return;
}

_remoteLCs[connName] = null;
}
}
}
}

public function receiveMsg(sender:String, cmd:int, args:Array=null):void{
trace(this, "receiveMsg()sender=" + sender + ", cmd=" + cmd + ", args=" + args);

if(args!=null)
args = ParamSerialization.deserialization(args);

if(cmd==CMD_FIND_ANYPEER_REQ){
receiveMsg_findAnyPeerReq(sender, cmd, args);
}else if(cmd==CMD_GET_AVAIBLE_CONNNAME_REQ){
receiveMsg_getAvaibleConnNameReq(sender, cmd, args);
}else if(cmd==CMD_GET_AVAIBLE_CONNNAME_RESP){
receiveMsg_getAvaibleConnNameResp(sender, cmd, args);
}else if(cmd==CMD_UPDATE_MAP){
receiveMsg_updateMap(sender, cmd, args);
}

var evt:ReceiveMessageEvent = new ReceiveMessageEvent(ReceiveMessageEvent.RECEIVE);
evt.sender = sender;
evt.cmd = cmd;
evt.args = args;
dispatchEvent(evt);
}

private function receiveMsg_findAnyPeerReq(sender:String, cmd:int, args:Array):void{
trace(this, "receiveMsg_findAnyPeerReq()");

send(sender, CMD_FIND_ANYPEER_RESP);
}

private function receiveMsg_getAvaibleConnNameReq(sender:String, cmd:int, args:Array):void{
trace(this, "receiveMsg_getAvaibleConnNameReq()");

var avaibleConnName:String = "";
for(var i:int=1; i <= MAX_CONN; i++){
avaibleConnName = CONN_NAME_SURFIX + i;
if(_allLCNames.indexOf(avaibleConnName)==-1){
send(sender, CMD_GET_AVAIBLE_CONNNAME_RESP, _allLCNames, avaibleConnName);
return;
}
}

send(sender, CMD_GET_AVAIBLE_CONNNAME_RESP, avaibleConnName);
}

private function receiveMsg_getAvaibleConnNameResp(sender:String, cmd:int, args:Array):void{
trace(this, "receiveMsg_getAvaibleConnNameResp()");

var allLCNames:Array = args[0] as Array;
var avaibleConnName:String = String(args[1]);

if(avaibleConnName!=""){
this.initLC(avaibleConnName);

_allLCNames = allLCNames;
_allLCNames.push(avaibleConnName);
dispatchEvent(new LocalP2PEvent(LocalP2PEvent.MAP_UPDATE));

//廣播,更新連線表
send("*", CMD_UPDATE_MAP, _allLCNames);
}else{
dispatchEvent(new LocalP2PEvent(LocalP2PEvent.CONN_FULL));
}
}

private function receiveMsg_updateMap(sender:String, cmd:int, args:Array):void{
trace(this, "receiveMsg_getAvaibleConnNameResp()");

_allLCNames = args[0];
dispatchEvent(new LocalP2PEvent(LocalP2PEvent.MAP_UPDATE));
}


}
}

package idv.ben.lp2p
AnyOnePeerFinder.as -- 新開啟的端點,要去找任一個已經存在的端點時


package idv.ben.lp2p
{
import flash.events.AsyncErrorEvent;
import flash.events.ErrorEvent;
import flash.events.EventDispatcher;
import flash.events.SecurityErrorEvent;
import flash.events.StatusEvent;
import flash.net.LocalConnection;

import idv.ben.lp2p.events.AnyOnePeerFinderEvent;
import idv.ben.lp2p.utils.ParamSerialization;

[Event(name="found", type="idv.ben.lp2p.events.AnyOnePeerFinderEvent")]
[Event(name="notFound", type="idv.ben.lp2p.events.AnyOnePeerFinderEvent")]
public class AnyOnePeerFinder extends EventDispatcher
{
private var _connName:String;
private var _lc:LocalConnection;
private var _lc2:LocalConnection;

private var _findIdx:int = 0;

public function AnyOnePeerFinder()
{
_connName = "Random_ConnName_AnyOnePeerFinder_" + (new Date()).getTime();
_lc = new LocalConnection();
_lc.client = this;
_lc.connect(_connName);

_lc2 = new LocalConnection();
_lc2.addEventListener(StatusEvent.STATUS, onStatus, false, 0, true);
_lc2.addEventListener(AsyncErrorEvent.ASYNC_ERROR, onError, false, 0, true);
_lc2.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onError, false, 0, true);
}

public function find():void{
++_findIdx;

trace(this, "find()", _findIdx);

if(_findIdx <= LocalP2P.MAX_CONN){
_lc2.send(LocalP2P.CONN_NAME_SURFIX + _findIdx, LocalP2P.RECEIVE_METHOD_NAME
, _connName
, LocalP2P.CMD_FIND_ANYPEER_REQ);
}else{
dispatchEvent(new AnyOnePeerFinderEvent(AnyOnePeerFinderEvent.NOT_FOUND));
_findIdx = 0;
}
}

private function onStatus(e:StatusEvent):void{
trace(this, "onStatus()", e);

if(e.level == "error"){
find();
}
}

private function onError(e:ErrorEvent):void{
trace(this, "onError()", e);

find();
}

public function receiveMsg(sender:String, cmd:int, args:Array=null):void{
trace(this, "receiveMsg()");

if(args!=null)
args = ParamSerialization.deserialization(args);

if(cmd==LocalP2P.CMD_FIND_ANYPEER_RESP){
receiveMsg_findAnyPeer(sender, cmd, args);
}
}

private function receiveMsg_findAnyPeer(sender:String, cmd:int, args:Array):void{
trace(this, "receiveMsg_findAnyPeer()");

var evt:AnyOnePeerFinderEvent = new AnyOnePeerFinderEvent(AnyOnePeerFinderEvent.FOUND);
evt.connName = sender;
dispatchEvent(evt);
}

}
}

package idv.ben.lp2p.utils
ParamSerialization.as -- 端點間 傳輸資料時,將參數進行序列化


package idv.ben.lp2p.utils
{
import com.adobe.serialization.json.JSONDecoder;
import com.adobe.serialization.json.JSONEncoder;

public class ParamSerialization
{
static public function serialization(params:Array):Array{

if(params==null)return null;

var resp:Array = new Array();

for(var i:int=0; i < params.length; i++){
resp.push((new JSONEncoder(params[i])).getString());
}

return resp;
}

static public function deserialization(params:Array):Array{

if(params==null)return null;

var resp:Array = new Array();

for(var i:int=0; i < params.length; i++){
resp.push((new JSONDecoder(params[i])).getValue());
}

return resp;
}

}
}

package idv.ben.lp2p.events
AnyOnePeerFinderEvent.as -- 一些事件


package idv.ben.lp2p.events
{
import flash.events.Event;

public class AnyOnePeerFinderEvent extends Event
{
static public const FOUND:String = "found";
static public const NOT_FOUND:String = "notFound";

public var connName:String;

public function AnyOnePeerFinderEvent(type:String, bubbles:Boolean=false, cancelable:Boolean=false)
{
super(type, bubbles, cancelable);
}

}
}

package idv.ben.lp2p.events
ConnectEvent.as -- 一些事件


package idv.ben.lp2p.events
{
import flash.events.Event;

public class ConnectEvent extends Event
{
static public const DISCONNECT:String = "disconnect";

public var connName:String = "";

public function ConnectEvent(type:String, bubbles:Boolean=false, cancelable:Boolean=false)
{
super(type, bubbles, cancelable);
}

}
}

package idv.ben.lp2p.events
LocalP2PEvent.as -- 一些事件


package idv.ben.lp2p.events
{
import flash.events.Event;

public class LocalP2PEvent extends Event
{
static public const CONN_FULL:String = "connFull";
static public const READY:String = "ready";
static public const MAP_UPDATE:String = "mapUpdate";

public function LocalP2PEvent(type:String, bubbles:Boolean=false, cancelable:Boolean=false)
{
super(type, bubbles, cancelable);
}

}
}

package idv.ben.lp2p.events
ReceiveMessageEvent.as -- 一些事件


package idv.ben.lp2p.events
{
import flash.events.Event;

public class ReceiveMessageEvent extends Event
{
static public const RECEIVE:String = "receive";

public var sender:String = "";

public var cmd:int = 0;

public var args:Array = null;

public function ReceiveMessageEvent(type:String, bubbles:Boolean=false, cancelable:Boolean=false)
{
super(type, bubbles, cancelable);
}

}
}
下圖是測試畫面: * 測試開啟新端點時,其他現有的端點如何回應 * 端點 a 送訊息給 端點 b 的時後 * 端點 a 廣播訊息 的時後

4 則留言:

{id: "Ticore"}; // 提到...

Cool!
好奇在開發過程中,這樣大量使用 LC 有無遇到 Crash 情況?

Ben Chang 提到...

開發中,若 Flex Builder 執行帶有 debug player 的瀏覽器(我的是IE),與直接去 / bin-debug / 中用其他瀏覽器開啟該 html,有時候會造成其中一個開半天都沒甚麼反映 (文字欄位沒輸出、也沒出現 debug player 的 error),因為若沒反應的那個瀏覽器不是 FB 正在測試的 session 的話,那麼 trace 的資料也不會顯示在 FB 中,所以我也還沒查出那個疑似 Crash 的情況是甚麼。

去掉以上的混搭測試環境,若都是直接開啟 html 作測試,那麼就不會有異常。

我確實還沒調整一些清理資源的工作,也因為還沒用在實際專案上,所以還看不出問題。

Ben Chang 提到...

未來實際用在專案時,我覺得比較擔心的問題是效能,若是 client 端有很耗用 CPU 的程式在執行的話,就有可能造成 Local Connection 傳輸太慢,所以未來還要想想如何減少彼此間的傳輸次數與量。

{id: "Ticore"}; // 提到...

恩,Debug 版 SWF 效能比較差,也容易發生問題,所以後來我幾乎都不用~~
印象中本地端傳資料速度應該很快才是,也許是因為 Flash Player 是另外配置 low priority thread 去處理這工作。
還是因為 Local Connection 有 40k size 限制?