跳转到主要内容

热门内容

今日:


总体:


最近浏览:


Chinese, Simplified

GovTech Edu的重点之一是为教育、文化、研究和技术部(MoECRT)开发数字产品(称为:应用程序),例如为教师提供的Merdeka Mengahal平台(PMM)和为大学利益相关者(学生、讲师、行业合作伙伴和从业者)提供的Kampus Merdeka平台。这些应用程序生成了大量数据,我们可以进一步利用这些数据进行分析和建模,以帮助MoECRT采取行动,改进数字产品和程序本身。鉴于生成数据的重要作用,我们可能会将应用程序生成的数据视为其本身的产品。为了优化下游的数据利用率,我们构建了简化的数据管道,就像您的普通管道一样。

Figure 1. Data Pipeline in General

我们使用Google BigQuery来存储数据,dbt用于我们的数据处理平台,Google Looker Studio作为我们的数据可视化平台。

为了支持我们提供仪表盘和每日报告的渠道——让我们从现在开始使用“报告”一词——我们监控数据质量和处理数据的基础设施成本。我们还为查询处理设置了每日配额限制。我们将这种成本监控和限制视为高度优先事项,因为我们必须尽可能深思熟虑,在不影响总成本的情况下为我们的数据获得最佳价值。

我们如何诊断和分析我们的成本?

随着用户对我们平台的采用呈指数级增长,我们的数据处理成本在前3个月持续增长了4倍以上(图2)。这导致每日配额限制受到更频繁的打击。一些ETL(或ELT)已停止运行,并且报告中未显示数字(或业务指标)。由于产品经理、部门负责人甚至部委的利益相关者等内部和外部利益相关者都需要这些报告,我们的数据分析师和科学家通过手动重新运行来帮助我们。然而,这种手动工作是不可扩展的,也没有给我们的人才带来任何最佳价值。

Figure 2. BigQuery Cost Growth in 3 Months

为了应对这些挑战,我们根据BigQuery审核日志-数据访问中的目标表评估了成本最高的BigQuery作业。我们通过以下3个步骤创建了一个成本监控面板:

  1. 通过观察模式对工作进行分类;执行者的电子邮件、作业名称、来源、目的地表等。其中一些类别包括dbt作业(ELT)、BigQuery控制台、Python和谷歌电子表格
  2. 创建dbt作业以每天计算BigQuery作业的成本,并将其存储在表中
  3. 使用此表作为源,我们制作了一个仪表板,其中包括要分析的多个可视化

我们发现dbt作业是成本最高的类别之一。

然后,从这些dbt作业中,我们根据以下标准连续选择需要处理的作业:

  1. 在这段时间内成本最高的人
  2. 使用简单的折线图查看成本趋势倾斜趋势中的工作排名会更高,下降趋势中的职位排名会更低
  3. 每个作业平均运行成本最高的作业,因为一旦这些作业需要更频繁地运行,成本就会显著上升
  4. 那些拥有最多工作的人

成本效率低下及其解决方案

在分析了有问题的dbt作业、更新其查询和验证结果(数字保持不变+成本降低)后,我们了解到在BigQuery中处理数据时可能会发现这些效率低下的问题。

处理可以每天更新的所有时间数据

想象一下,你在一个电子商务应用程序工作,你的工作是提供交易和收入的每日摘要。您可以通过查询原始订单表并将结果存储在汇总表中来实现这一点。相当简单。然后创建每天运行的查询。

SELECT
  order_date,
  SUM(1) AS transactions,
  SUM(total_amount) AS revenue
FROM
  `raw.order`
GROUP BY
  order_date
ORDER BY
  order_date

通常,ELT是在D-1数据上运行的,这意味着你要等到第二天才能完全收集第一天的数据,然后再进行处理。因此,在第二天运行时,这就是你的结果。

╔════════════╦══════════════╦═══════════╗
║ order_date ║ transactions ║ revenue ║
╠════════════╬══════════════╬═══════════╣
║ 2022-01-01 ║ 1,124 ║ 3,567,300 ║
╚════════════╩══════════════╩═══════════╝

假设1行表示1个事务。这意味着您将在2022年1月1日处理1124行数据。

继续到第二天。

╔════════════╦══════════════╦═══════════╗
║ order_date ║ transactions ║ revenue ║
╠════════════╬══════════════╬═══════════╣
║ 2022-01-01 ║ 1,124 ║ 3,567,300 ║
║ 2022-01-02 ║ 1,549 ║ 4,938,100 ║
╚════════════╩══════════════╩═══════════╝

2022年1月3日,您总共处理了2673行。这就是它变得有趣的地方。如果你再看一次,你已经运行了两次1月1日的数据。然而,“交易”和“收入”的结果分别为1124和3567300。这意味着您正在重新计算1124行数据,以获得相同的结果。听起来有点浪费,是吗?

想象一下,这个过程一直持续到2022年1月6日。

╔════════════╦══════════════╦═══════════╗
║ order_date ║ transactions ║ revenue ║
╠════════════╬══════════════╬═══════════╣
║ 2022-01-01 ║ 1,124 ║ 3,567,300 ║
║ 2022-01-02 ║ 1,549 ║ 4,938,100 ║
║ 2022-01-03 ║ 1,374 ║ 4,538,900 ║
║ 2022-01-04 ║ 862 ║ 2,551,600 ║
║ 2022-01-05 ║ 938 ║ 3,498,500 ║
╚════════════╩══════════════╩═══════════╝

现在,您正在处理5847行,并且已经重新计算

  1. 1月1日数据的4倍,
  2. 1月2日数据的3倍,
  3. 1月3日数据的2倍,以及
  4. 1次1月4日的数据。

随着时间的推移,这个数字将不断增加。这就是处理后的数据的样子,与此相一致的是成本。

那么我们该如何解决这个问题呢?解决方案是只对新数据(以前没有计算过的数据)运行查询,并将结果附加到结果表中,而不是完全重写。换句话说:

  1. 在日期2,只运行1月1日的数据,然后将结果追加到汇总表中
  2. 在第3天,只运行1月2日的数据,然后像第1点一样追加结果
  3. 等等

处理后的数据将如下所示。它要小得多——这会导致更快的流程——而且效率更高,不是吗?

 

我们可以通过对日常数据而不是所有时间进行计算来降低成本。我们的日常数据越大,这种方法就越能为我们省钱。

在BigQuery中,这个称为分区的概念根据表的字段将表拆分为“迷你表”。在查询中,我们可以检索这些分区,而不是一个完整的表,然后将结果存储为目标表中的新分区(或覆盖)。因此,为了充分利用分区,源和目的地都需要启用这个概念。

  1. 主要针对源表,因为它决定了BigQuery中查询的成本,以及
  2. 因为它可能会成为其他查询的源表。

我们修改查询以检索源的分区,并将其存储为分区。以下是我们用来配置目标分区的dbt脚本片段。

{{ config(
     materialized = 'incremental',
     partition_by = {
       "field": "event_date_gmt7",
       "data_type": "date",
       "granularity": "day"
     }
)}}

通常,此片段告诉dbt按`event_date_gmt7`字段对目标表进行分区。

同时,这是我们源代码的片段。

FROM
  {{ ref('<fact_table_name>') }}
WHERE
  event_date_gmt7 > '<latest_date_partition_of_destination_table>'
  AND event_date_gmt7 <= DATE_SUB(CURRENT_DATE('+07:00'), INTERVAL 1 DAY)

通过在WHERE子句中使用“event_date_gmt7”,也就是这个源表的分区字段,我们已经告诉dbt只检索源表的一部分,而不是整个表。非常简单明了!

BigQuery还有另一个拆分表的概念,称为集群。简而言之,集群就像一个分区,但它不是基于一个字段,而是基于字段的组合。因此,在查询时,BigQuery将在扫描表源之前筛选分区字段和集群字段。下面是一个在表中启用集群的示例片段。

 

{{ config(
     materialized = 'incremental',
     partition_by = {
       "field": "event_date_gmt7",
       "data_type": "date",
       "granularity": "day"
     },
   cluster_by = [
      'user_interface',
      '<column_b>',
      '<column_c>',
      '…'
   ]
)}}

这是使用它的片段。

FROM
  {{ ref('<fact_table_name>') }}
WHERE
  event_date_gmt7 > '<latest_date_partition_of_destination_table>'
  AND event_date_gmt7 <= DATE_SUB(CURRENT_DATE('+07:00'), INTERVAL 1 DAY)
  AND user_interface = 'Android'

是的,这和使用分区是一样的。您只需要在WHERE子句中声明筛选器。再说一遍,非常简单明了!

实际上,您可以在不使用分区的情况下使用集群。但如果你将两者结合起来,你可以进一步优化你的成本。您可以在谷歌提供的这个简短的实验室中学习分区和集群。

扫描整张表以获取最新分区

查看源分区的前一个代码片段,我们检索最新目的地的日期分区。其目的是只运行源表中未处理的数据。那么我们该怎么做呢?

在大多数情况下,我们将使用此查询。

SELECT
  MAX(<partition_field>)
FROM
  `<source_table>`

然而,如果源表包含数百万行,我们将遍历所有这些行,并产生更多的成本。想象一下,一个表有100个分区,每个分区包含1000000行。通过使用实际的表,我们查询了100 x 1000000=100000000行。有没有更好的方法来获取最新的分区?

是的,当然,通过使用元数据表。在BigQuery中,元数据存储在INFORMATION_SCHEMA视图中,其中包含有关BigQuery对象的所有信息,包括存储在partitions视图中的表中的分区列表。此视图中的一行包含有关一个分区的信息,例如其表数据集/架构、表名称、分区ID、总行数和上次更新时间。以下是用于查询元数据的SQL代码段。

SELECT
  MAX(SAFE.PARSE_DATE('%Y%m%d', partition_id))
FROM
  `<project_name>.<dataset_name>.INFORMATION_SCHEMA.PARTITIONS`
WHERE
  table_name = '<table_name>'

因此,通过使用前面的示例,我们只使用元数据查询100行,而不是100000000行。更快、更便宜。

在dbt中,我们将这个查询放在宏中,这样我们就可以在ELT中重用它。宏代码将如下所示

{% macro get_latest_part_date(column_name, relation) %}

{% set relation_query %}
  ## Get latest date from metadata
  ## If 'column_name' is null, return D-1
  SELECT
    MAX(SAFE.PARSE_DATE('%Y%m%d', partition_id))
  FROM
    {{ relation.database }}.{{ relation.schema }}.INFORMATION_SCHEMA.PARTITIONS
  WHERE
    table_name = '{{ relation.identifier }}'
{% endset %}

{% set results = run_query(relation_query) %}

{% if execute %}
  {% if results.columns[0][0] != None %}
  ## Latest partition not null (found)
  {% set results_list = results.columns[0][0] %}
  {% else %}
  ## Default value
  {% set results_list = '2021–01–01' %}
{% endif %}

{{ return(results_list) }}

{% endmacro %}

查询将如下所示。

FROM
{{ ref('<fact_table_name>') }}
WHERE
event_date_gmt7 > '{{ get_latest_part_date('event_date_gmt7', this) }}'
AND event_date_gmt7 <= DATE_SUB(CURRENT_DATE('+07:00'), INTERVAL 1 DAY)

多次查询最新分区

另一点与之前关于不使用元数据表的学习有关,这一点与之并行。我们在ELT中经常使用多个源表来生成一个新的结果表。这最初使我们为每个源调用宏。

然而,几天后,我们意识到我们的成本又上升到了最初的水平。我们发现,每次调用宏时,它都会向BigQuery发送一个查询作业来检索结果。因此,如果我们得到10个使用该宏的源表,那么BigQuery将有10个查询作业返回相同的结果。考虑到当时我们仍然使用实际的表,想象一下有10个查询扫描包含数千万行的整个表。

我们只调用这个宏一次,将其作为CTE(WITH子句中的表),然后在每个源表的FROM子句中连接该表,从而解决了这个问题。

WITH
vars AS (
  -- Get latest partition of this table
  SELECT
    DATE("{{ get_latest_part_date('event_date_gmt7', this) }}") AS latest_partition
)
, t1 AS (
  SELECT
    f.*
  FROM
    {{ ref('<fact_table_name_1>') }} f,
    vars
  WHERE
    event_date_gmt7 > latest_partition
    AND event_date_gmt7 <= '<yesterday_date>'
)
, t2 AS (
  SELECT
    g.*
  FROM
    {{ ref('<fact_table_name_1>') }} g,
    vars
  WHERE
    event_date_gmt7 > latest_partition
    AND event_date_gmt7 <= '<yesterday_date>'
)
...

后果

结合这些经验来更新我们一些成本最高的dbt工作,我们将成本降低了约95.91%。我们还调整了Looker Studio数据源,将成本降低约98.97%。

结论

  1. 利用分区和集群的优势。或者数据库中提供的任何分块机制。
  2. 要获取最新的分区,请使用元数据而不是实际的表。只是你的表没有分区。
  3. 您可能使用宏不仅是为了运行查询以获取最新的分区,而且由于它会为每个调用的宏触发一个作业,请注意并将其设为“参数”。
  4. 请毫不犹豫地查看文档(例如,像dbt这样的ETL/ELT平台)。一开始这可能看起来很吓人,但我相信我们会了解到关于平台的一两件事(无论是什么),这可能对我们的问题有用,或者甚至在问题出现之前优化我们当前的脚本。你也可以从YouTube上学习教程或会议。
  5. 循序渐进地改进是可以的。这些知识,我也是一周又一周地发现的,不是一次全部发现的。追求进步而不是完美
  6. 最后但同样重要的是,如果您正在迁移/更改表,而不是创建一个新表,请确保您的表具有与以前的模型相同的结果,以便进行无缝更改。我们不想仅仅为了成本效益而损害我们报告的准确性。
本文地址
最后修改
星期六, 五月 13, 2023 - 12:52
Article