久久久精品一区ed2k-女人被男人叉到高潮的视频-中文字幕乱码一区久久麻豆樱花-俄罗斯熟妇真实视频

ES譯文之如何使用Logstash實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與ElasticSearch之間的數(shù)據(jù)同

譯者前言
近期的主要工作是在為公司的 APP 增加搜索功能。因?yàn)橐灿龅搅诵枰殃P(guān)系型數(shù)據(jù)庫(kù)中的數(shù)據(jù)同步 ElasticSearch 中的問(wèn)題,故抽了點(diǎn)時(shí)間翻譯了這篇官方的博文。最近,在數(shù)據(jù)同步方面也有些思考。
本篇文章的重點(diǎn)不在 Logstash 的 JDBC 插件的使用方法,而是數(shù)據(jù)同步會(huì)遇到的一些細(xì)節(jié)問(wèn)題如何處理。我覺(jué)得,這些設(shè)計(jì)思想是通用的,無(wú)論你使用的何種方式進(jìn)行數(shù)據(jù)同步。
翻譯正文

成都創(chuàng)新互聯(lián)專(zhuān)注于方正網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗(yàn)。 熱誠(chéng)為您提供方正營(yíng)銷(xiāo)型網(wǎng)站建設(shè),方正網(wǎng)站制作、方正網(wǎng)頁(yè)設(shè)計(jì)、方正網(wǎng)站官網(wǎng)定制、微信小程序定制開(kāi)發(fā)服務(wù),打造方正網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供方正網(wǎng)站排名全網(wǎng)營(yíng)銷(xiāo)落地服務(wù)。

為了利用 ElasticSearch 強(qiáng)大的搜索能力,大部分的業(yè)務(wù)都會(huì)在關(guān)系型數(shù)據(jù)庫(kù)的基礎(chǔ)上部署 ElasticSearch。這類(lèi)場(chǎng)景下,保持 ElasticSearch 和關(guān)系型數(shù)據(jù)庫(kù)之間的數(shù)據(jù)同步是非常必要的。
本篇博文將會(huì)介紹如何通過(guò) Logstash 實(shí)現(xiàn)在 MySQL 和 ElasticSearch 之間數(shù)據(jù)的高效復(fù)制與同步。
注:文中演示的代碼和方法都經(jīng)過(guò)在 MySQL 中的測(cè)試,理論上適應(yīng)于所有的關(guān)系型數(shù)據(jù)庫(kù)。
本文中,組件的相關(guān)信息如下:

MySQL:  8.0.16.
Elasticsearch: 7.1.1
Logstash: 7.1.1
Java: 1.8.0_162-b12
JDBC input plugin: v4.3.13
JDBC connector: Connector/J 8.0.16

數(shù)據(jù)同步概述
本文將會(huì)通過(guò) Logstash 的 JDBC input 插件進(jìn)行 ElasticSearch 和 MySQL 之間的數(shù)據(jù)同步。從概念上講,JDBC 插件將通過(guò)周期性的輪詢以發(fā)現(xiàn)上次迭代后的新增和更新的數(shù)據(jù)。為了正常工作,幾個(gè)條件需要滿足:
ElasticSearch 中 _id 設(shè)置必須來(lái)自 MySQL 中 id 字段。它提供了 MySQL 和 ElasticSearch 之間文檔數(shù)據(jù)的映射關(guān)系。如果一條記錄在 MySQL 更新,那么,ElasticSearch 所有關(guān)聯(lián)文檔都應(yīng)該被重寫(xiě)。要說(shuō)明的是,重寫(xiě) ElasticSearch 中的文檔和更新操作的效率相同。在內(nèi)部實(shí)現(xiàn)上,一個(gè)更新操作由刪除一個(gè)舊文檔和創(chuàng)建一個(gè)新文檔兩部分組成。
當(dāng) MySQL 中插入或更新一條記錄時(shí),必須包含一個(gè)字段用于保存字段的插入或更新時(shí)間。如此一來(lái), Logstash 就可以實(shí)現(xiàn)每次請(qǐng)求只獲取上次輪詢后更新或插入的記錄。Logstash 每次輪詢都會(huì)保存從 MySQL 中讀取到的最新的插入或更新時(shí)間,該時(shí)間大于上次輪詢最新時(shí)間。
如果滿足了上述條件,我們就可以配置 Logstash 周期性的從 MySQL 中讀取所有最新更新或插入的記錄,然后寫(xiě)入到 Elasticsearch 中。
關(guān)于 Logstash 的配置代碼,本文稍后會(huì)給出。
MySQL 設(shè)置
MySQL 庫(kù)和表的配置如下:

CREATE DATABASE es_db

USE es_db

DROP TABLE IF EXISTS es_table

CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

配置中有幾點(diǎn)需要說(shuō)明,如下:

es_table,MySQL 的數(shù)據(jù)表,我們將把它的數(shù)據(jù)同步到 ElasticSearch 中;
id,記錄的唯一標(biāo)識(shí)。注意,id 定義為主鍵的同時(shí),也定義為唯一建,可以保證每個(gè) id 在表中只出現(xiàn)一次。同步 ElasticSearch 時(shí),將會(huì)轉(zhuǎn)化為文檔的 _id;
client_name,表示用戶定義用來(lái)保存數(shù)據(jù)的字段,為使博文保持簡(jiǎn)潔,我們只定義了一個(gè)字段,更多字段也很容易加入。接下來(lái)的演示,我們會(huì)更新該字段,用以說(shuō)明不僅僅新插入記錄會(huì)同步到 MySQL,更新記錄同樣會(huì)同步到 MySQL;
modification_time,用于保存記錄的更新或插入時(shí)間,它使得 Logstash 可以在每次輪詢時(shí)只請(qǐng)求上次輪詢后新增更新的記錄;
insertion_time,該字段用于一條記錄插入時(shí)間,主要是為演示方便,對(duì)同步而言,并非必須;

MySQL 操作
前面設(shè)置完成,我們可以通過(guò)如下命令插入記錄:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);`

使用如下命令更新記錄:

UPDATE es_table SET client_name = <new client name> WHERE id=<id>;

使用如下命令更新插入記錄:

INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>

同步代碼
Logstash 的 pipeline 配置代碼如下,它實(shí)現(xiàn)了前面描述的功能,從 MySQL 到 ElasticSearch 的數(shù)據(jù)同步。

input {
  jdbc {
    jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
    jdbc_user => "<my username>"
    jdbc_password => "<my password>"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *",
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
  }
}
filter {
  mutate {
    copy => { "id" => "[@metadata][_id]"}
    remove_field => ["id", "@version", "unix_ts_in_secs"]
  }
}

output {
  # stdout { codec => "rubydebug" }
  elasticsearch {
    index => "rdbms_sync_idx"
    document_id => "%{[%metedata][_id]}"
  }
}

關(guān)于 Pipeline 配置的幾點(diǎn)說(shuō)明,如下:

tracking_column

此處配置為 "unix_ts_in_secs"。它被用于追蹤最新的記錄,并被保存在 .logstash_jdbc_last_run 文件中,下一次輪詢將以這個(gè)邊界位置為準(zhǔn)進(jìn)行記錄獲取。SELECT 語(yǔ)句中,可通過(guò) :sql_last_value 訪問(wèn)該配置字段的值。

unix_ts_in_secs

由 SELECT 語(yǔ)句生成,是 "modification_time" 的 UNIX TIMESTAMP。它被前面討論的 "track_column" 引用。使用 UNIX TIMESTAMP,而非其他時(shí)間形式,可以減少?gòu)?fù)雜性,防止時(shí)區(qū)導(dǎo)致的時(shí)間不一致問(wèn)題。

sql_last_value

內(nèi)建的配置參數(shù),指定每次輪詢的開(kāi)始位置。在 input 配置中,可被 SELECT 語(yǔ)句引用。在每次輪詢開(kāi)始前,從 .logstash_jdbc_last_run 中讀取,此案例中,即為 "unix_ts_in_secs" 的最近值。如此便可保證每次輪詢只獲取最新插入和更新的記錄。

schedule

通過(guò) cron 語(yǔ)法指定輪詢的執(zhí)行周期,例子中,"/5 " 表示每 5 秒輪詢一次。

modification_time < NOW()

SELECT 語(yǔ)句查詢條件的一部分,當(dāng)前解釋不清,具體情況待下面的章節(jié)再作介紹。

filter

該配置指定將 MySQL 中的 id 復(fù)制到 metadata 字段 _id 中,用以確保 ElasticSearch 中的文檔寫(xiě)入正確的 _id。而之所以使用 metadata,因?yàn)樗桥R時(shí)的,不會(huì)使文檔中產(chǎn)生新的字段。同時(shí),我們也會(huì)把不希望寫(xiě)入 Elasticsearch 的字段 id 和 @version 移除。

output

在 output 輸出段的配置,我們指定了文檔應(yīng)該被輸出到 ElasticSearch,并且設(shè)置輸出文檔 _id 為 filter 段創(chuàng)建的 metadata 的 _id。如果需要調(diào)試,注釋部分的 rubydebug 可以實(shí)現(xiàn)。
SELECT 語(yǔ)句的正確性分析
接下來(lái),我們將開(kāi)始解釋為什么 SELECT 語(yǔ)句中包含 modification_time < NOW() 是非常重要的。為了解釋這個(gè)問(wèn)題,我們將引入兩個(gè)反例演示說(shuō)明,為什么下面介紹的兩種最直觀的方法是錯(cuò)誤的。還有,為什么 modification_time < Now() 可以克服這些問(wèn)題。
直觀場(chǎng)景一
當(dāng) where 子句中僅僅包含 UNIX_TIMESTAMP(modification_time) > :sql_last_value,而沒(méi)有 modification < Now() 的情況下,工作是否正常。這個(gè)場(chǎng)景下,SELECT 語(yǔ)句是如下形式:

statement => "SELECT *, UNIX_TIMESTAMP(modification_time)
AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >
:sql_last_value) ORDER BY modification_time ASC"

粗略一看,似乎沒(méi)發(fā)現(xiàn)什么問(wèn)題,應(yīng)該可以正常工作。但其實(shí),這里有一些邊界情況,可能導(dǎo)致一些文檔的丟失。舉個(gè)例子,假設(shè) MySQL 每秒插入兩個(gè)文檔,Logstash 每 5 秒執(zhí)行一次。如下圖所示,時(shí)間范圍 T0 至 T10,數(shù)據(jù)記錄 R1 至 R22。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與 ElasticSearch 之間的數(shù)據(jù)同

Logstash 的第一次輪詢發(fā)生在 T5 時(shí)刻,讀取記錄 R1 至 R11,即圖中青色區(qū)域。此時(shí),sql_last_value 即為 T5,這個(gè)時(shí)間是從 R11 中獲取到的。
如果,當(dāng) Logstash 完成從 MySQL 讀取數(shù)據(jù)后,同樣在 T5 時(shí)刻,又有一條記錄插入到 MySQL 中。 而下一次的輪詢只會(huì)拉取到大于 T5 的記錄,這意味著 R12 將會(huì)丟失。如圖所示,青色和灰色區(qū)域分別表示當(dāng)次和上次輪詢獲取到的記錄。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與 ElasticSearch 之間的數(shù)據(jù)同

注意,這類(lèi)場(chǎng)景下的 R12 將永遠(yuǎn)不會(huì)再被寫(xiě)入到 ElasticSearch。
直觀場(chǎng)景二
為了解決這個(gè)問(wèn)題,或許有人會(huì)想,如果把 where 子句中的大于(>)改為大于等于(>=)是否可行。SELECT 語(yǔ)句如下

statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"

這種方式其實(shí)也不理想。這種情況下,某些文檔可能會(huì)被兩次讀取,重復(fù)寫(xiě)入到 ElasticSearch 中。雖然這不影響結(jié)果的正確性,但卻做了多余的工作。如下圖所示,Logstash 的首次輪詢和場(chǎng)景一相同,青色區(qū)域表示已經(jīng)讀取的記錄。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與 ElasticSearch 之間的數(shù)據(jù)同

Logstash 的第二次輪詢將會(huì)讀取所有大于等于 T5 的記錄。如下圖所示,注意 R11,即紫色區(qū)域,將會(huì)被再次發(fā)送到 ElasticSearch 中。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與 ElasticSearch 之間的數(shù)據(jù)同

這兩種場(chǎng)景的實(shí)現(xiàn)效果都不理想。場(chǎng)景一會(huì)導(dǎo)致數(shù)據(jù)丟失,這是無(wú)法容忍的。場(chǎng)景二,存在重復(fù)讀取寫(xiě)入的問(wèn)題,雖然對(duì)數(shù)據(jù)正確性沒(méi)有影響,但執(zhí)行了多余的 IO。
終極方案
前面的兩場(chǎng)方案都不可行,我們需要繼續(xù)尋找其他解決方案。其實(shí)也很簡(jiǎn)單,通過(guò)指定 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()),我們就可以保證每條記錄有且只發(fā)送一次。
如下圖所示,Logstash 輪詢發(fā)生在 T5 時(shí)刻。因?yàn)橹付?modification_time < NOW(),文檔只會(huì)讀取到 T4 時(shí)刻,并且 sql_last_value 的值也將會(huì)被設(shè)置為 T4。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與 ElasticSearch 之間的數(shù)據(jù)同

開(kāi)始下一次的輪詢,當(dāng)前時(shí)間 T10。
由于設(shè)置了 UNIX_TIMESTAMP(modification_time) > :sql_last_value,并且當(dāng)前 sql_last_value 為 T4,因此,本次的輪詢將從 T5 開(kāi)始。而 modification_time < NOW() 決定了只有時(shí)間小于等于 T9 的記錄才會(huì)被讀取。最后,sql_last_value 也將被設(shè)置為 T9。
ES 譯文之如何使用 Logstash 實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與 ElasticSearch 之間的數(shù)據(jù)同

如此,MySQL 中的每個(gè)記錄就可以做到都能被精確讀取了一次,如此就可以避免每次輪詢可能導(dǎo)致的當(dāng)前時(shí)間間隔內(nèi)數(shù)據(jù)丟失或重復(fù)讀取的問(wèn)題。
系統(tǒng)測(cè)試
簡(jiǎn)單的測(cè)試可以幫助我們驗(yàn)證配置是否如我們所愿。我們可以寫(xiě)入一些數(shù)據(jù)至數(shù)據(jù)庫(kù),如下:

INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');

一旦 JDBC 輸入插件觸發(fā)執(zhí)行,將會(huì)從 MySQL 中讀取所有記錄,并寫(xiě)入到 ElasticSearch 中。我們可以通過(guò)查詢語(yǔ)句查看 ElasticSearch 中的文檔。

`GET rdbms_sync_idx/_search`

執(zhí)行結(jié)果如下:

"hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "rdbms_sync_idx",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "insertion_time" : "2019-06-18T12:58:56.000Z",
          "@timestamp" : "2019-06-18T13:04:27.436Z",
          "modification_time" : "2019-06-18T12:58:56.000Z",
          "client_name" : "Jim Carrey"
        }
      },
Etc …

更新 id=1 的文檔,如下:

UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;

通過(guò) _id = 1,可以實(shí)現(xiàn)文檔的正確更新。通過(guò)執(zhí)行如下命令查看文檔:

GET rdbms_sync_idx/_doc/1

結(jié)果如下:

{
  "_index" : "rdbms_sync_idx",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "insertion_time" : "2019-06-18T12:58:56.000Z",
    "@timestamp" : "2019-06-18T13:09:30.300Z",
    "modification_time" : "2019-06-18T13:09:28.000Z",
    "client_name" : "Jimbo Kerry"
  }
}

文檔 _version 被設(shè)置為 2,并且 modification_time 和 insertion_time 已經(jīng)不一樣了,client_name 已經(jīng)正確更新。而 @timestamp,不是我們需要關(guān)注的,它是 Logstash 默認(rèn)添加的。
更新添加 upsert 執(zhí)行語(yǔ)句如下:

INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';

復(fù)制代碼和之前一樣,我們可以通過(guò)查看 ElasticSearch 中相應(yīng)文檔,便可驗(yàn)證同步的正確性。
文檔刪除
不知道你是否已經(jīng)發(fā)現(xiàn),如果一個(gè)文檔從 MySQL 中刪除,并不會(huì)同步到 ElasticSearch 。關(guān)于這個(gè)問(wèn)題,列舉一些可供我們考慮的方案,如下:
MySQL 中的記錄可通過(guò)包含 is_deleted 字段用以表明該條記錄是否有效。一旦發(fā)生更新,is_deleted 也會(huì)同步更新到 ElasticSearch 中。如果通過(guò)這種方式,在執(zhí)行 MySQL 或 ElasticSearch 查詢時(shí),我們需要重寫(xiě)查詢語(yǔ)句來(lái)過(guò)濾掉 is_deleted 為 true 的記錄。同時(shí),需要一些后臺(tái)進(jìn)程將 MySQL 和 ElasticSearch 中的這些文檔刪除。
另一個(gè)可選方案,應(yīng)用系統(tǒng)負(fù)責(zé) MySQL 和 ElasticSearch 中數(shù)據(jù)的刪除,即應(yīng)用系統(tǒng)在刪除 MySQL 中數(shù)據(jù)的同時(shí),也要負(fù)責(zé)將 ElasticSearch 中相應(yīng)的文檔刪除。
總結(jié)
本文介紹了如何通過(guò) Logstash 進(jìn)行關(guān)系型數(shù)據(jù)庫(kù)和 ElasticSearch 之間的數(shù)據(jù)同步。文中以 MySQL 為例,但理論上,演示的方法和代碼也應(yīng)該同樣適應(yīng)于其他的關(guān)系型數(shù)據(jù)庫(kù)。

文章標(biāo)題:ES譯文之如何使用Logstash實(shí)現(xiàn)關(guān)系型數(shù)據(jù)庫(kù)與ElasticSearch之間的數(shù)據(jù)同
標(biāo)題路徑:http://sd-ha.com/article4/jiisie.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供App設(shè)計(jì)、營(yíng)銷(xiāo)型網(wǎng)站建設(shè)、App開(kāi)發(fā)、軟件開(kāi)發(fā)、企業(yè)網(wǎng)站制作微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

手機(jī)網(wǎng)站建設(shè)