Datavines 集成 DolphinScheduler 打通数据质量管理最后一公里 ↗

文摘   2024-10-30 00:00   重庆  

导 读  数据质量检查是数据处理中不可或缺的一个环节,用来保证流向下游的数据的准确性。Datavines 是专注于数据可观测性的开源项目,核心能力提供了元数据管理和数据质量监控。本文带来 DolphinScheduler 与 Datavines 的集成方案,使其成为DolphinScheduler 数据处理DAG 中的一个环节,阻断错误数据流向下游。

注:拉取最新的dev分支代码进行部署,更新的用户需要新增dv_access_token,表结构在mysql脚本中。

Githubhttps://github.com/datavane/datavines

欢迎关注、Star、Fork,参与贡献 


DS 集成 Datavines 步骤

第一步:申请令牌

在 Datavines 中申请令牌,这个令牌是调用 Datavines OpenApi 时用的,步骤如下:

创建令牌

查看令牌:

第二步:创建作业

Datavines 中创建数据质量检查作业,拿到规则作业ID

第三步:在 DS 中创建任务

创建任务

  • 进入项目以后,创建或选择一个工作流,进入工作流编辑页面
  • 新增一个 Task,选择 Shell 任务类型

输入脚本模版

将下面脚本复制到脚本输入框里面,这个脚本是我们提供的脚本模版,用户可以根据自己的需要进行修改。

当前脚本的运行结果是根据检查结果来输出的,结果为成功则 exit 0 ,失败则 exit 1。

#!/bin/bash

job_execute_url="${datavines_address}/api/v1/openapi/job/execute"
job_execution_status_url="${datavines_address}/api/v1/openapi/job/execution/status/"
job_execution_result_url="${datavines_address}/api/v1/openapi/job/execution/result/"

repsonse_job_execute=$(curl -s -X POST "$job_execute_url/${job_id}" -H "Authorization: Bearer ${token}")

if [ -z "$repsonse_job_execute" ]; then
  echo "[DATAVINES][ERROR]: job execute response is null"
  exit 1
fi

code_job_execute=$(echo "$repsonse_job_execute" | sed -n 's/.*"code":\([0-9]*\).*/\1/p')
job_execution_id=$(echo "$repsonse_job_execute" | sed -n 's/.*"data":\([0-9]*\).*/\1/p')

# 判断 job_execute 接口返回的 code 是否为 200
if [ "$code_job_execute" -ne 200 ]; then
  echo "[DATAVINES][ERROR]: job execute response : $repsonse_job_execute"
  exit 1
fi

echo "[DATAVINES][INFO]: job execution id is $job_execution_id"

# 检查 id 是否为空
if [ -z "$job_execution_id" ]; then
  echo "[DATAVINES][ERROR]: job execute repsonse id is null, exit"
  exit 1
fi

# 轮询 job_execution_status 接口
while truedo
  # 调用 job_execution_status 接口(GET)
  response_job_execution_status=$(curl -s -X GET "$job_execution_status_url/$job_execution_id" -H "Authorization: Bearer ${token}")

  # 提取 job_execution_status 接口返回的 code 和 data 值
  code_job_execution_status=$(echo "$response_job_execution_status" | sed -n 's/.*"code":\([0-9]*\).*/\1/p')
  data_job_execution_status=$(echo "$response_job_execution_status" | sed -n 's/.*"data":"\([^"]*\)".*/\1/p')

  # 判断 job_execution_status 接口返回的 code 是否为 200
  if [ "$code_job_execution_status" -ne 200 ]; then
    echo "[DATAVINES][ERROR]: job execution status repsonse is error: $response_job_execution_result"
    exit 1
  fi

  # 检查返回的 data 值
  if [ "$data_job_execution_status" == "FAILURE" ]; then
    echo "[DATAVINES][ERROR]: job execution status is $data_job_execution_status , exit"
    exit 1
  elif [ "$data_job_execution_status" == "SUCCESS" ]; then
    echo "[DATAVINES][INFO]: job execution status is $data_job_execution_status , to get job execution result"
    
    # 调用 job_execution_result 接口(GET)
    repsonse_job_execution_result=$(curl -s -X GET "$job_execution_result_url/$job_execution_id" -H "Authorization: Bearer ${token}")

    # 提取 job_execution_result 接口返回的 code 和 data
    code_job_execution_result=$(echo "$repsonse_job_execution_result" | sed -n 's/.*"code":\([0-9]*\).*/\1/p')
    data_job_execution_result=$(echo "$repsonse_job_execution_result" | sed -n 's/.*"data":"\([^"]*\)".*/\1/p')

    # 判断 job_execution_result 接口返回的 code 是否为 200
    if [ "$code_job_execution_result" -ne 200 ]; then
      echo "[DATAVINES][ERROR]: job execution result is error: $repsonse_job_execution_result"
      exit 1
    fi

    # 检查 job_execution_result 接口的返回 data 值
    if [ "$data_job_execution_result" == "SUCCESS" ]; then
      echo "[DATAVINES][INFO]: job execution result is $data_job_execution_result"
      exit 0
    else
      echo "[DATAVINES][INFO]: job execution result is $data_job_execution_result"
      exit 1
    fi
  else
    # data_job_execution_status 为其他值时记录日志,但不中断执行
    echo "[DATAVINES][INFO]: job execution status is $data_job_execution_status , continute"
  fi

  # 等待一段时间再进行下一次轮询
  sleep 2
done

添加参数

这里需要输入三个参数

  • datavines_address: datavines 服务的地址,比如 http://localhost:5600
  • job_id:作业ID,可以从作业列表上进行复制
  • token:在令牌管理页面可以找到

第四步:运行工作流

完成上面几个步骤以后,运行 DolphinScheduler 的工作流,查看任务实例日志如下:

以上就是 DolphinScheduler 集成 Datavines 的步骤,抓紧用起来吧!!!

加入我们

Datavines 的目标是成为更好的数据可观测性领域的开源项目,为更多的用户去解决元数据管理和数据质量管理中遇到的问题。在此我们真诚欢迎更多的贡献者参与到社区建设中来,和我们一起成长,携手共建更好的社区。
  • 项目地址:
    https://github.com/datavane/datavines
  • 问题和建议:
    https://github.com/datavane/datavines/issues
  • 贡献代码:
    https://github.com/datavane/datavines/pull
  • 社区沟通:

大数据技能圈
分享大数据前沿技术,实战代码,详细文档
 最新文章