DataHub中导入csv文件中的元数据

2022-04-27

    数据平台代表了一个第三方系统,DataHub从数据平台获取数据元,都与一个平台相关联,例如MySQL、Oracle、Hive或HDFS等。

    在有些情况下,要导入的数据元不属于DataHub支持的数据平台,怎么办呢?

    DataHub提供了Rest API的方式导入,目前支持Python和Java的SDK。下面以Java SDK为例:

    1、引入sdk

    pom.xml 添加依赖:

    <!-- https://mvnrepository.com/artifact/io.acryl/datahub-client -->
    <dependency>
        <groupId>io.acryl</groupId>
        <artifactId>datahub-client</artifactId>
        <!-- replace with the latest version number -->
        <version>0.0.1</version>
    </dependency>

    2、添加数据平台

    添加一个数据平台可以指定名称和logo,添加成功并有数据元导入后,登录datahub首页可以看到。

    由于此接口比较简单,因此采用了直接调用http接口的方式:

    # 添加 SQLFlow 数据平台
    curl 'http://localhost:8080/entities?action=ingest' -X POST --data '{
       "entity":{
          "value":{
             "com.linkedin.metadata.snapshot.DataPlatformSnapshot":{
                "aspects":[
                   {
                      "com.linkedin.dataplatform.DataPlatformInfo":{
                          "datasetNameDelimiter": "/",
                          "name": "SQLFlow",
                          "type": "OTHERS",
                          "logoUrl": "http://101.43.8.206/lineage/images/logo.png"
                      }
                   }
                ],
                "urn":"urn:li:dataPlatform:SQLFlow"
             }
          }
       }
    }'

    3、添加数据集

    3.1、配置接口连接参数

    import datahub.client.rest.RestEmitter;
    //...
    RestEmitter emitter = RestEmitter.create(b -> b
                                                  .server("http://localhost:8080")
    //Auth token for Managed DataHub              .token(AUTH_TOKEN_IF_NEEDED)
    //Override default timeout of 10 seconds      .timeoutSec(OVERRIDE_DEFAULT_TIMEOUT_IN_SECONDS)
    //Add additional headers                      .extraHeaders(Collections.singletonMap("Session-token", "MY_SESSION"))
    // Customize HttpClient's connection ttl      .customizeHttpAsyncClient(c -> c.setConnectionTimeToLive(30, TimeUnit.SECONDS))
                                        );

    3.2、定义一个数据集并上传

    import com.linkedin.dataset.DatasetProperties;
    import com.linkedin.events.metadata.ChangeType;
    import datahub.event.MetadataChangeProposalWrapper;
    import datahub.client.rest.RestEmitter;
    import datahub.client.Callback;
    // ... followed by
    
    // Creates the emitter with the default coordinates and settings
    RestEmitter emitter = RestEmitter.createWithDefaults(); 
    
    MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
            .entityType("dataset")
            .entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
            .upsert()
            .aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))//数据元 这只是一个简单的只带描述的示例,复杂的数据元格式见3.3
            .build();
    
    
    // Non-blocking using callback
    emitter.emit(mcpw, new Callback() {
          @Override
          public void onCompletion(MetadataWriteResponse response) {
            if (response.isSuccess()) {
              System.out.println(String.format("Successfully emitted metadata event for %s", mcpw.getEntityUrn()));
            } else {
              // Get the underlying http response
              HttpResponse httpResponse = (HttpResponse) response.getUnderlyingResponse();
              System.out.println(String.format("Failed to emit metadata event for %s, aspect: %s with status code: %d",
                  mcpw.getEntityUrn(), mcpw.getAspectName(), httpResponse.getStatusLine().getStatusCode()));
              // Print the server side exception if it was captured
              if (response.getServerException() != null) {
                System.out.println(String.format("Server side exception was %s", response.getServerException()));
              }
            }
          }
    
          @Override
          public void onFailure(Throwable exception) {
            System.out.println(
                String.format("Failed to emit metadata event for %s, aspect: %s due to %s", mcpw.getEntityUrn(),
                    mcpw.getAspectName(), exception.getMessage()));
          }
        });

    3.3、定义数据元

    注意:不能有中文字符,如果有会报错。

    SchemaMetadata metadata = new SchemaMetadata();
    DatasetUrn.createFromString("urn:li:dataPlatform:bigquery");
    String schemaname = list.get(0).get("schemaname");
    String tablename = list.get(0).get("tablename");
    metadata.setSchemaName(schemaname);//platformSchema
    SchemaMetadata.PlatformSchema  platformSchema = new       SchemaMetadata.PlatformSchema();
    OtherSchema schema = new OtherSchema();
    schema.setRawSchema("SQLFlow");
    platformSchema.setOtherSchema(schema);
    metadata.setPlatformSchema(platformSchema);
    DataPlatformUrn urn = new DataPlatformUrn("SQLFlow");
    metadata.setPlatform(urn);
    metadata.setVersion(0);
    metadata.setHash(hashKeyForDisk(key));
    SchemaFieldArray fieldArray = new SchemaFieldArray();
    metadata.setFields(fieldArray);
    for(Map<String, String > map: list){
    	SchemaField field = new SchemaField();
    	field.setDescription("");
    	field.setFieldPath(map.get("columnname"));
    	field.setNativeDataType(map.get("external_type"));
    	SchemaFieldDataType dataType =  getDataType(field.getNativeDataType());
    	field.setType(dataType);
    	fieldArray.add(field);
    }

    4、删除数据集

    datahub delete --env PROD --entity_type dataset --platform SQLFlow