9年互聯(lián)網(wǎng)/大數(shù)據(jù)領(lǐng)域研發(fā)、架構(gòu)經(jīng)驗(yàn),2015年加入京東物流,主要負(fù)責(zé)大數(shù)據(jù)相關(guān)架構(gòu)與開發(fā)工作。
摘要: 數(shù)據(jù)蜂巢平臺是京東物流自主研發(fā)的分布式、高性能、高可用、支持異構(gòu),離線和實(shí)時(shí)的大數(shù)據(jù)同步與管理平臺。關(guān)鍵技術(shù):HA;離線與實(shí)時(shí)同步整合;binlog采集,存儲與訂閱;客戶端并發(fā)消費(fèi);一致性校驗(yàn)與修復(fù);任務(wù)隔離。
目前已經(jīng)在京東物流系統(tǒng)中大規(guī)模應(yīng)用,比如單源和多源復(fù)制,從全國各地倉儲園區(qū)集群(上百個(gè))實(shí)時(shí)復(fù)制到IDC,從mysql到ES,從mysql到cassandra等等。倉儲園區(qū)硬件、網(wǎng)絡(luò)環(huán)境復(fù)雜,數(shù)據(jù)蜂巢平臺需要考慮硬件設(shè)施和網(wǎng)絡(luò)故障的容錯(cuò)性。議題主要分享平臺誕生的背景,使用的關(guān)鍵技術(shù),架構(gòu)的演進(jìn)過程,演進(jìn)過程中所踩過的坑。
正文:
京東物流一線數(shù)據(jù)大部分都存在MySQL數(shù)據(jù)庫上,分布比較廣,包括國內(nèi)外園區(qū)庫房和IDC,這樣數(shù)據(jù)使用起來極不方便,各業(yè)務(wù)系統(tǒng)為了使用數(shù)據(jù),開發(fā)出多種版本的同步工具,結(jié)果導(dǎo)致管理非常困難以及資源的浪費(fèi)。這時(shí)就需要一個(gè)統(tǒng)一的平臺把這些數(shù)據(jù)管理起來。
架構(gòu)設(shè)計(jì)
關(guān)于數(shù)據(jù)同步,主要分為:批量同步、實(shí)時(shí)訂閱和實(shí)時(shí)同步。
批量同步: 采用sqoop的模式,把數(shù)據(jù)分片,然后進(jìn)行多機(jī)并發(fā)復(fù)制,用以提升效率。
實(shí)時(shí)訂閱:使用消息隊(duì)列,即將binlog事件解析生成對應(yīng)的消息存儲在隊(duì)列中;
實(shí)時(shí)同步:通過客戶端去消費(fèi)隊(duì)列將數(shù)據(jù)寫入目標(biāo)存儲,從而實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)同步。
如何中將以上三個(gè)功能整合到一個(gè)平臺?
首先,適配批量同步,參照sqoop, sqoop是借助于hadoop集群提交一個(gè)mr作業(yè), 類似的,蜂巢系統(tǒng)也提供一個(gè)集群,將一個(gè)批量同步作業(yè)分為多task去執(zhí)行。實(shí)時(shí)訂閱也可使用這種思路,不同的是每一個(gè)task即對應(yīng)一臺mysql實(shí)例的binlog采集;同理實(shí)時(shí)同步也可分成多個(gè)task,每一個(gè)task即為一個(gè)消息隊(duì)列的消費(fèi)客戶端。
架構(gòu)圖如下所示:
采用了經(jīng)典的master、slave結(jié)構(gòu),每一個(gè)slave上可以跑三種對應(yīng)的任務(wù)。BatchWorker負(fù)責(zé)把MySQL數(shù)據(jù)批量同步到storage里面,這里的storage是一個(gè)抽象,不一定就是存儲之類的系統(tǒng),也有可能是一些業(yè)務(wù)的處理。StreamWorker負(fù)責(zé)binlog的采集和消息隊(duì)列的維護(hù),如果有訂閱需求,使用系統(tǒng)提供的客戶端Consumer就可以直接消費(fèi)。
Pieworker則是Consumer的分裝,只不過這個(gè)客戶端由集群去管理維護(hù)。
BatchWorker結(jié)構(gòu)比較簡單,主要由Fetcher、Sinker和Storage(與整體架構(gòu)圖中的storage不同,此處為一個(gè)buffer)組成。Fetcher接口負(fù)責(zé)抽取數(shù)據(jù),Storage負(fù)責(zé)緩存數(shù)據(jù),Sinker接口負(fù)責(zé)數(shù)據(jù)的寫入。
StreamWorker模仿MySQL的主從復(fù)制機(jī)制, RelayLogTask負(fù)責(zé)binlog的抽取;HHLTask負(fù)責(zé)解析binlog,生成消息體,最后存在hhl中,hhl即為上文提到消息隊(duì)列。
客戶端: StreamJob除了采集binlog,維護(hù)消息隊(duì)列外,還提供了一個(gè)ClientServer模塊,用于接收客戶端的消費(fèi)請求。當(dāng)Consumer需要消費(fèi)的時(shí)候,請求會發(fā)送到ClientServer,ClientServer通過索引快速的定位到hhl文件的某一個(gè)位置,然后把數(shù)據(jù)讀取出來,然后經(jīng)過客戶端的指定過濾規(guī)則進(jìn)行過濾,最終將消息體傳送給客戶端。
消息的位點(diǎn)由三部分組成的:Serverld對應(yīng)的就是MySQL的Serverld;binlogPosition是一個(gè)長整型數(shù)字,高32位為binlog文件下標(biāo),低32位為文件內(nèi)部的位置;time為對應(yīng)的binlog事件產(chǎn)生的時(shí)間。
為解決性能問題,客戶端支持多種并行模式。
第一種是以事務(wù)為單位串行處理;
第二種是在第一種基礎(chǔ)上進(jìn)行了一個(gè)簡單的優(yōu)化,比如:一個(gè)事務(wù)內(nèi)只對單表操作,那么這個(gè)事務(wù)是完全可以并發(fā)的,不同表間的事務(wù)順序并不會影響最終結(jié)果的一致性;但事務(wù)內(nèi)對多表操作就需要繼續(xù)串行。所以第二種并發(fā)模式就是不斷串行并行轉(zhuǎn)變的過程。
以上兩種消費(fèi)模式是以事務(wù)為單位,以下兩種以行為單位
第三種是表級并發(fā),將事務(wù)拆分為多個(gè)行級操作,同表操作由同一線程完成,保障同一表操作的有序性;
第四種是行級并發(fā),主要是把MySQL的數(shù)據(jù)同步到NOSQL上時(shí)使用,關(guān)系型數(shù)據(jù)庫同步時(shí)局限性較大,因?yàn)樾屑壊l(fā)只保障同一主鍵的操作有序,而關(guān)系型數(shù)據(jù)庫會存在多個(gè)唯一約束,這樣即使保障了主鍵的操作有序也可能引起數(shù)據(jù)不一致。
集群的特性:作為一個(gè)集群都需要要保證高可用、數(shù)據(jù)本地性和負(fù)載均衡。高可用這一塊主要分為三部分:
MySQL: 由DBA保證數(shù)據(jù)庫的高可用,但當(dāng)Mysql主從切換時(shí),binlog的位點(diǎn)是不一致的,此時(shí)系統(tǒng)通過Serverld的檢測發(fā)現(xiàn)該變更,然后通過時(shí)間在新的mysql實(shí)列上定位正確的binlog位點(diǎn)。
master(Queen):基于Zookeeper完成Active角色的選舉
Bee(Slave):Bee宕機(jī)后由Master將其運(yùn)行的任務(wù)遷移到其他的Slave上。
數(shù)據(jù)本地性:每一個(gè)Bee在啟動(dòng)時(shí)都配置了機(jī)房,分組等信息,作業(yè)提交時(shí)可以指定自己期望的運(yùn)行位置,與hadoop,spark類似。
負(fù)載均衡: 每一個(gè)Bee會將自己的負(fù)載信息通過心跳發(fā)給Queen,queen進(jìn)行作業(yè)調(diào)度時(shí),會在滿足數(shù)據(jù)本地性的前提下選擇壓力最小的機(jī)器去運(yùn)行新任務(wù)。
演進(jìn)
HHL文件丟失:上文提到過,如果Streamworker的運(yùn)行主機(jī)宕機(jī),Master會把它遷移到另外一臺機(jī)器上,但是Streamworker采集解析的binlog存在本地,遷移后會引起數(shù)據(jù)丟失,解決這個(gè)問題通用的方案是多副本,但大數(shù)據(jù)量下的多復(fù)本會造成磁盤空間的浪費(fèi),尤其是在庫房環(huán)境下。
并且這些數(shù)據(jù)有一個(gè)特點(diǎn),就是發(fā)生遷移時(shí),雖然解析過的數(shù)據(jù)丟失,但是原始binlog都會在機(jī)器上保存(dba會保留n天的binlog數(shù)據(jù)),最終可以通過數(shù)據(jù)補(bǔ)全來保證數(shù)據(jù)不丟失。
上圖為第一版streamworker的擴(kuò)展,最右面的可以認(rèn)為是這一組的主線程,虛線位置發(fā)生任務(wù)遷移,切換到了新的主機(jī)上,此時(shí)虛線左邊的數(shù)據(jù)全部丟失。如果有客戶端需要消費(fèi)丟失的數(shù)據(jù),服務(wù)端則啟動(dòng)一組新線程,然后進(jìn)行catchup,catchup會把丟失的部分補(bǔ)齊并提供給客戶端消費(fèi)。
元數(shù)據(jù): binlog是不記錄字段名等元數(shù)據(jù)的,而客戶端消費(fèi)時(shí)需要。最簡單的方式是收集到binlog之后,去源庫上查詢,但在binlog采集延遲期間如果有ddl操作,會導(dǎo)致元數(shù)據(jù)不準(zhǔn)確。為解決該問題系統(tǒng)實(shí)現(xiàn)了一個(gè)快照模塊。
在StreamJob初次啟動(dòng)的時(shí)候,把對應(yīng)的MySQL里面所有表都做一份快照,在此后的運(yùn)行期間監(jiān)控DDL操作,當(dāng)解析到DDL操作時(shí)會將原快照取出生成一個(gè)復(fù)本,并在這個(gè)復(fù)本上應(yīng)用這個(gè)ddl,生成新的快照 。這樣系統(tǒng)可以保證任何時(shí)刻binlog對應(yīng)的元數(shù)據(jù)都是正確的,方便用戶使用。
客戶端:服務(wù)端并不記錄客戶端的消費(fèi)位點(diǎn),消費(fèi)的位置由客戶端自行存儲。由于客戶端采用的是并發(fā)消費(fèi)模式,消息又是嚴(yán)格有序的,此時(shí)位點(diǎn)記錄就必須保障每一個(gè)記錄下的位點(diǎn)之前的所有消息都被正確處理了,此處引入了一個(gè)環(huán)形提交隊(duì)列(具體實(shí)現(xiàn)與disruptor類似)。
當(dāng)連續(xù)的多個(gè)消息被正確處理,并達(dá)到記錄位點(diǎn)的間隔,此時(shí)提交隊(duì)列會將一個(gè)位點(diǎn)寫入對應(yīng)的存儲介質(zhì)。比如1,2,3,4,5,8,10,14被處理完成,位點(diǎn)提交間隔為5,則5位置對應(yīng)的位點(diǎn)被記錄,當(dāng)6,7,9被處理完成后10再被記錄。
以下主要為易用性的改進(jìn):
SQL: 用戶通過SQL描述需要同步哪些字段,同步條件等信息,服務(wù)端通過解析sql執(zhí)行對應(yīng)的同步邏輯。
Union: 用于處理多表合一的場景,通過加入來源標(biāo)識字段來解決唯一約束。
Join: 在同步的過程中完成寬表的加工,內(nèi)部通過緩存,布隆過濾器等優(yōu)化方案來提升性能。
一致性較驗(yàn):
系統(tǒng)提供了兩種模型:
第一種是使用pt_table_checksum的思路,比如要較驗(yàn)一張表的數(shù)據(jù),先把這張表的數(shù)據(jù)分成多個(gè)片段,然后對這些片段進(jìn)行crc計(jì)算,并將結(jié)果和計(jì)算的范圍存儲到同一個(gè)數(shù)據(jù)庫的表里去,此時(shí)會觸發(fā)一個(gè)binlog事件,當(dāng)消費(fèi)者消費(fèi)到這個(gè)事件后重現(xiàn)該操作,通過比對計(jì)算結(jié)果值來確定數(shù)據(jù)是否一致。
這種方式一是侵入性比較強(qiáng),需要在原庫上建對應(yīng)的比對結(jié)果表,二是需要加鎖,三是對延遲的要求很高,當(dāng)延遲較大時(shí),消費(fèi)端拿不到對應(yīng)的比對事件,將無法確定數(shù)據(jù)一致性。
第二種數(shù)據(jù)校驗(yàn)的模型是基于BatchJob實(shí)現(xiàn)。
Fetcher抽取源和目標(biāo)雙方的數(shù)據(jù),排序后通過storage把數(shù)據(jù)傳遞給sinker,Sinker根據(jù)用戶自定義的比較接口對數(shù)據(jù)進(jìn)行比較,最后將差異通過Collector進(jìn)行收集。這種比對方式的缺點(diǎn)是不能保證時(shí)間序列,在比對的時(shí)候數(shù)據(jù)是變更的,比對出來的結(jié)果可能并不是真正的差異。
此時(shí)我們需要修復(fù)比對結(jié)果,首先在比對開始時(shí)把消費(fèi)端的位點(diǎn)記錄下來,比對完成后,如果有差異則從比對開始前記錄的位點(diǎn)進(jìn)行binlog重放,通過分析binlog中操作,對差異進(jìn)行修復(fù),輸出最終結(jié)果。
該比對模型最大的問題是需要抽取比對雙方的數(shù)據(jù),對帶寬占用較大,為減少網(wǎng)絡(luò)傳輸,內(nèi)部會對數(shù)據(jù)進(jìn)行初步的篩選,將要比對的數(shù)據(jù)分成多個(gè)片段,在存儲端對每個(gè)片段進(jìn)行md5計(jì)算,fetcher只收集md5值 ,只有雙方md5值不同時(shí)才將真正的數(shù)據(jù)抽取到計(jì)算端進(jìn)行比較。
修復(fù):
系統(tǒng)提供兩種修復(fù)方式:
一是基于binlog事件傳遞,當(dāng)比對出差異數(shù)據(jù)后,只需要在源庫上對差異數(shù)據(jù)進(jìn)行一個(gè)偽操作(比如更改update_time字段),觸發(fā)binlog事件的產(chǎn)生,消費(fèi)端收到該事件即可修復(fù)錯(cuò)誤數(shù)據(jù)(事件對應(yīng)的消息體內(nèi)包含了所有對應(yīng)字段的值,并不只限于update_time字段),該方式缺點(diǎn)是受延遲的影響,同時(shí)還需要源庫的寫權(quán)限。
二是直接修復(fù),即直接在目標(biāo)上將差異數(shù)據(jù)修正。在修復(fù)數(shù)據(jù)時(shí)需要加鎖或暫停同步,避免并發(fā)問題。
資源的隔離: 系統(tǒng)默認(rèn)只提供了常用的同步功能,當(dāng)默認(rèn)實(shí)現(xiàn)無法滿足客戶需求時(shí),用戶需要自行編寫代碼來實(shí)現(xiàn)對應(yīng)的接口來完成他們的邏輯。Bee開始使用的線程模型,即每一個(gè)Bee可以看做一個(gè)線程池,多個(gè)任務(wù)都在同一線程池內(nèi)運(yùn)行,而用戶自定義代碼又無法控制,導(dǎo)致不同任務(wù)相互影響,更有甚者任務(wù)結(jié)束后資源不釋放。
為保證資源隔離,將線程改為進(jìn)程,將不同作業(yè)的任務(wù)交由不同的子進(jìn)程去執(zhí)行,子進(jìn)程啟動(dòng)時(shí)會指定額定運(yùn)行資源;所有子進(jìn)程由Bee統(tǒng)一管理,Bee不再運(yùn)行具體任務(wù),作業(yè)結(jié)束后,Bee將未退出的子進(jìn)程全部強(qiáng)制殺死。
“一帶一路”背景下,物流裝備企業(yè)的全球化發(fā)展 01月01日 08:00
解讀汽車物流全局?jǐn)?shù)字化 08月16日 14:11
“大數(shù)據(jù)與智慧物流”專題報(bào)道 12月06日 14:24
2017年中國零售電商十大熱點(diǎn)事件點(diǎn)評 12月11日 16:35
供應(yīng)鏈協(xié)同、互聯(lián)網(wǎng)+,中國制造業(yè)轉(zhuǎn)型的十個(gè)方向! 01月11日 10:42
預(yù)判:2018年,中國快遞十個(gè)可見趨勢 02月28日 11:16
2018年關(guān)于零售的18個(gè)趨勢 03月27日 10:51