数据库同步工具
sqlserver,Mysql数据同步软件

使用Azure Data Factory将数据加载到Azure Data Lake中的快速方法

购买事宜请联系QQ:1793040

问题

在上一篇文章 Azure Data Factory Pipeline中,将所有SQL Server对象完全加载到ADLS Gen2中,我使用Azure Data Factory成功地将许多SQL Server表加载到Azure Data Lake Store Gen2中。尽管较小的表在创纪录的时间内加载,但数十亿条记录(400GB以上)的大表却运行了18-20多个小时。如何将大数据表从本地SQL Server更快地加载到ADLS?

Azure数据工厂是基于云的强大数据集成。在Azure数据工厂中,“映射数据流”复制活动具有基于GUI的工具,该工具允许并行加载分区数据。但是,“映射数据流”当前当前不支持本地源,因此该选项当前不在表格中。在考虑可能有哪些替代选项时,我知道我以前的文章《 使用SQL Server Integration Services根据条件生成Excel文件》,我讨论了如何将大型SQL Server表拆分为多个文件。其背后的想法是使用两个ForEach循环为所有分区范围创建文件。第一个ForEach循环查找表,并将其传递给第二个Nested ForEach循环,该循环将查找分区范围,然后生成文件。如果将相同的概念应用于Azure数据工厂,则知道存在一个可用于此任务的查找和ForEach活动,但是,嵌套的ForEach循环不是ADF复制活动的功能。同样,此选项不在桌面上。基于本地SQL Server表的列表,我还需要哪些其他选项来在数据湖中生成分区文件?

尽管目前ADF从本地SQL Server表在Data Lake中生成分区文件的功能受到限制,但我可以实现一个自定义解决方案来实现此任务。在本文中,我将详细介绍利用在前几篇文章中创建的管道参数表的过程,并介绍一个新表,该表将用于存储分区字段。此过程的好处有两个:1)我将能够通过分区范围将本地SQL Server表加载到ADLS,而分区范围将使用Azure Data Factory并行加载; 2)分区记录也将得到简化进入相同的Azure Data Factory管道,查找,和foreach循环活动作为未分区的表,这些表均已在我的管道参数表中标记和定义。本文将不介绍用于提高Azure Data Factory中的速度和性能的任何性能优化技术和比较。

先决条件

作为先决条件,我建议您熟悉我之前与该主题相关的一些文章,这些文章最终会导致该过程:

  • Azure数据工厂管道将所有SQL Server对象完全加载到ADLS Gen2
  • 记录Azure数据工厂管道审核数据
  • 使用Azure Data Lake Store gen2中的COPY INTO Azure Synapse Analytics
  • 使用Azure数据工厂将Data Lake文件加载到Azure Synapse DW中

创建并填充管道参数分区表

在我以前的文章中,我介绍了管道参数表的概念,该表将包含ADF管道编排过程的元数据。

接下来,我将创建管道参数分区表来存储与分区相关的列。下面的SQL语法将创建pipeline_parameter_partition表,并在管道参数表中添加外键约束。

SET ANSI_NULLS ON GO

 
SET QUOTED_IDENTIFIER ON GO

 
CREATE TABLE [ DBO ] [。pipeline_parameter_partition ]([ partition_id时] [ BIGINT ] IDENTITY (1 ,1 )NOT NULL , - 主密钥[ pipeline_parameter_id ] [ INT ] NULL ,--foreign关键pipeline_parameter ID [ src_name ] [ nvarchar的] (500 )NULL ,- 源SQL 表名
    [  
        
      
     partition_field ] [ nvarchar ](500 )NULL ,在源系统中标识的--partition字段[ partition_watermark_start ] [ nvarchar ](500 )NULL ,-- watermark start [ partition_operator ] [ nvarchar ](500 )NULL ,-- operator可以更改[ partition_watermark_end ] [ nvarchar ](500 )NULL ,-- watermark end CONSTRAINT   
      
      
      
  [ PK_pipeline_parameter_partition ] PRIMARY KEY CLUSTERED ([ partition_id时] ASC )WITH (STATISTICS_NORECOMPUTE = OFF ,IGNORE_DUP_KEY = OFF )ON [ PRIMARY ] )ON [ PRIMARY ] GO    

    
     
  

 
ALTER TABLE [ DBO ]。[ pipeline_parameter_partition ] WITH CHECK 加入约束[ FK_pipeline_parameter_id ] 外国KEY ([ pipeline_parameter_id ])参考[ DBO ]。[ pipeline_parameter ] ([ ID ])GO           
  

 
ALTER TABLE [ dbo ]。[ pipeline_parameter_partition ] 检查约束[ FK_pipeline_parameter_id ] GO     

创建并填充pipeline_parameter_partition表后,它看起来将类似于以下内容:

管道参数分区表

创建数据工厂管道

现在已经创建并填充了基本表,可以创建数据工厂管道。与我以前的文章中包含创建管道的步骤类似,该管道以Lookup和ForEach活动开始。

数据工厂管道流

查找活动–设置和源查询

查找源数据集是Azure SQL DB,其中包含pipeline_parameter和pipeline_parameter_partition表。

源数据集

源查询将pipeline_parameter_partition表与pipeline_parameter表联接在一起,以获取需要分区的表和不需要分区的表的UNION结果集。

选择[ server_name ] ,[ src_type ] ,[ src_schema ] ,[ src_db ] ,p 。[ src_name ] ,[ dst_type ] ,[ dst_name ] + '_' + pp 。[ partition_watermark_end ] as [ dst_name ] ,[ include_pipeline_flag ] ,[ process_type ] ,[ priority_lane ] ,[ pipeline_date ] ,[ pipeline_status ] 
    
   
   
   
   
   
     
   
   
   
   
   
   ,[dst_folder]+'/'+[dst_name] AS [dst_folder]
   ,[file_type]
   ,[last_modified_folder_date]
   ,pp.[partition_field]
   ,pp.partition_watermark_start
   ,pp.partition_watermark_end
   ,pp.partition_operator
FROM [dbo].[pipeline_parameter] p
INNER JOIN [dbo].[pipeline_parameter_partition] pp ON pp.pipeline_parameter_id = p.ID
WHERE adhoc = 1
  AND priority_lane = 1
 
UNION
 
SELECT 
    [server_name]
   ,[src_type]
   ,[src_schema]
   ,[src_db]
   ,[src_name]
   ,[dst_type]
   ,[dst_name]
   ,[include_pipeline_flag]
   ,[process_type]
   ,[priority_lane]
   ,[pipeline_date]
   ,[pipeline_status]
   ,[dst_folder]+'/'+[dst_name] AS [dst_folder]
   ,[FILE_TYPE ] ,[ last_modified_folder_date ] ,'1' AS partition_field
    ,'1' AS partition_watermark_start
    ,'1' AS partition_watermark_end
    ,'之间' AS partition_operator
 FROM [ DBO ]。[ pipeline_parameter ] p
 WHERE 即席= 1 AND priority_lane = 1 AND p 。ID 未在(SELECT DISTINCT
   
         
   
      pipeline_parameter_id FROM [ DBO ]。[ pipeline_parameter_partition ] PP ) 
			

结果集可能类似于以下内容,其中我有一个非分区表的记录以及三个分区表的记录,因为我在pipeline_parameter_partition表中定义了三个分区。使用这种方法,我可以将所有源记录合并到一个源查找活动中,并使流程由管道参数表驱动。

源SQL查找查询

我将以下字段设置为1的原因是,当我将这些字段传递给ForEach循环的Source SQL Select语句时,我希望这些字段的值为true。

,'1'AS partition_field
,'1'AS partition_watermark_start
,'1'AS partition_watermark_end			

该查询看起来像这样,其中1和1之间为1,其结果为true。这样,查找查询不会在需要将管道参数加载到数据湖但不需要进行分区的管道参数表上失败,从而适应了分区表和非分区表的简化过程。

选择* FROM @ {item()。server_name} 。@ {item()。src_db} 。@ {item()。src_schema} 。@ {item()。src_name}
@ {item()。partition_field} @ {item()。partition_operator} @ {item()。partition_watermark_start}和
@ {item()。partition_watermark_end}			

ForEach循环活动–设置

确保ForEach循环活动设置项包含查找活动的输出值非常重要。

另外,请注意,未选中“顺序”设置,因此将并行创建文件。也可以定义批数。默认批处理计数为20,最大为50。

顺序设定

每次循环活动

深入研究ForEach Loop活动后,我们看到画布上有许多活动。我以前的文章更详细地讨论了成功/失败日志记录过程。就本文而言,我将重点介绍ForEach Loop活动的存储过程和复制活动。

循环活动

ForEach循环活动–存储过程

存储过程被设计为基于pipeline_parameter_partition表的ID列,基于两个表中源名称的联接。

存储过程

SET ANSI_NULLS ON GO

 
SET QUOTED_IDENTIFIER ON GO

 
- ============================================= CREATE PROCEDURE [ DBO ]。[ update_pipeline_partition_fk ] AS BEGIN UPDATE pipeline_parameter_partition
   SET     pipeline_parameter_partition 。pipeline_parameter_id = p 。ID
   FROMpipeline_parameter_partition    pp
   INNER JOINpipeline_parameter p ON pp 。src_name = p 。src_name
 END GO
  


   
	

1)pipeline_parameter表:

我的管道参数表中有两条记录。我的pipeline_parameter表的ID = 32是我的分区表,因为它包含pipeline_parameter_partition表中的匹配记录,其中pipeline_parameter_partition.src_name = pipeline_parameter.src_name:

src名称

2)pipeline_parameter_partition表:

运行存储过程后,我们可以看到pipeline_parameter_partition中的pipeline_parameter_id已更新:

分区字段

ForEach循环活动–复制数据

复制数据活动将运行查询以查找表,其中分区字段位于分区水印开始字段和分区水印结束字段之间。

复制数据

选择* FROM @ {item()。server_name} 。@ {item()。src_db} 。@ {item()。src_schema} 。@ {item()。src_name}
@ {item()。partition_field} @ {item()。partition_operator} @ {item()。partition_watermark_start}和
@ {item()。partition_watermark_end}			

管道数据集

作为上一篇文章的更新,接收器包含以下数据集属性:

数据库属性

接收器数据集的连接属性类似于以下内容:

模式参数

管道结果

管道完成运行后,所有活动均已成功。

集成运行时

浏览目标Data Lake文件夹后,我们可以看到总共生成了四个文件。

对于非分区表,有一个实木复合地板文件。

实木复合地板文件

在pipeline_parameter_partition表中为三个不同的分区提供了三个拼花文件。

实木复合地板

下一步

  • 有关如何在源SQL Server表上编写SQL查询以创建一系列分区的更多信息,然后可以使用这些查询来填充pipeline_parameter_partition表,请参见这篇出色的MSSQLTips文章: 在SQL Server中对数据进行分区而不使用分区表。
  • 有关在参数为NULL的情况下编写SQL查询以选择全部的更多信息,请参见 在参数为空或NULL时选择全部的SQL查询。
  • 有关编写SQL查询以在NULL时忽略WHERE子句的更多信息,请参见 如果parameter为null,则 SQL忽略WHERE的一部分。
  • 有关编写SQL查询以结合UNION和INNER JOIN的更多信息,请参见 如何结合UNION和INNER JOIN?
  • 有关优化ADF复印活动性能的更多信息,请参阅 复印活动性能优化功能。
  • 有关对ADF复制活动的性能进行故障排除和调优的详细信息,请参阅《 复制活动性能和调优指南》。

未经允许不得转载:数据库同步软件|Mysql数据同步软件|sqlserver数据库同步工具|异构同步 » 使用Azure Data Factory将数据加载到Azure Data Lake中的快速方法

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

syncnavigator 8.6.2 企业版

联系我们联系我们