Creating custom materialization in DBT using Jinja2

Materialization: 

DBT model persistence in a warehouse can be accomplished through materializations. The materializations in DBT can be divided into four categories. These include table, view, incremental, and ephemeral.

Table:

A table is a database object that is used to store data in relational databases in the form of rows and columns. It actually stores the data in a DBMS.

View:

A view contains no data of its own. It is a query of one or more tables that provides another way of presenting the information.

Incremental:

Incremental models allow DBT to insert or update records in a table since the last time that it was run. With the help of incremental models, you can significantly reduce the build time by just transforming new records (only the latest records will be processed).

Ephemeral:

The database does not natively support ephemeral models. Instead, DBT will interpolate the code from one model onto dependent models using a common table expression.

Ephemeral models can assist in maintaining a clutter-free data warehouse (also consider splitting models across multiple schemas by using custom schemas).

Custom Materialization:

An SQL statement from a DBT model is translated into a dataset in a database through the use of “materializations.”

Can be used to configure the database for the new model, run pre-hooks, execute any SQL necessary to implement the desired materialization, run post-model hooks, clean up the database as needed, and update the relation cache.

The custom materialization should be created in project_folder/macros/file_name.sql.

Why do we need custom materialization?

By creating custom materialization, we can process the data according to our needs and perform patterns like an append pattern or a delta pattern.

Creating Materialization:

The basic materialization to create a model as a view

				
					{% materialization mat_name, adapter='default' %}
    {% call statement('main') %}
        {{create_view_as(this,sql)}}
    {% endcall %}
    {{return({'relations':[this]})}}
{% endmaterialization %}
				
			

mat_name is the name of the custom materialization.

The adapter is obtained from the profile.yml or the user can choose any adapters installed in the project environment.

An SQL call statement accesses a database and returns results to your Jinja environment. (Compile the SQL).

The default macro is create_view_as, which creates a view by passing two parameters (a relation and an SQL query).

this refers to the relation containing the current model database, schema, and model name. [ Example “database_name”. “schema_name”. “table_name”].

To obtain specific information, the user can use this.database, this.schema, this.table, or this.identifier.

The SQL variable contains the SQL query that is written in the model file.

At the end of execution, the return in the materializations should return the list of relations that they have created.

Note: If the user wants to execute a pre-hook or post-hook, then the materialization should call the run_hooks macro inside the materialization.

				
					--pre hook 
{{ run_hooks(pre_hooks, inside_transaction=False) }}
   -- `BEGIN` happens here:
   {{ run_hooks(pre_hooks, inside_transaction=True) }}

 --post hook
   {{ run_hooks(post_hooks, inside_transaction=True) }}

    {{ run_hooks(post_hooks, inside_transaction=False) }}
				
			

Calling Materialization:

				
					{{config(
    materialized='mat_name'
)}}

select * from {{source('source_1','emp_src')}}
				
			

Using config(), a user can set the model’s materialization options and pass any necessary variables at the same time.

materialized refers to the method by which a model will be materialized (views, table, incremental). A custom materialization (mat_name) was used in this case.

The model will be referred to as materialization (mat_name) and created as a view as mentioned in the mat_name materialization.

The source function refers to the source table in the source.yml file.

Config properties:

In config, you can pass values from models and get them from the materialization by using config.get().

Passing values in config:

				
					{{config(
    materialized='mat_name',
    primary_key='id',
    exclude_cols = none
)}}

select * from {{source('source_1','emp_src')}}
				
			

Getting values in materialization:

				
					{% materialization mat_name, adapter='default' %}
    {% set primary_key = config.require('primary_key',) %}
    {% set exclude_cols = config.get('exclude_cols', default=none)%}
  {% call statement('main') %}
            create or replace table {{this}} as (sql);
            alter table {{this}} add primary key (primary_key);
            {% if exclude_cols is not none %}
                alter table {{this}} drop column exclude_cols;
            {% endif %}
      {% endcall %}

    {{return({'relations':[this]})}}
{% endmaterialization %}
				
			

config.get is used to get the values from the configuration of the model.

Config.require is used to get mandatory values for the materialization from the model. It will throw an error if the variable is not found in the configuration.

Incremental materialization which does not allow duplicates in first run:

In the default DBT incremental materialization, duplicate rows are permitted in the first run.

To overcome this, we develop our own materialization.

				
					{% materialization incremental_new, default %}

    -- get the primary key 
    {% set unique_key = config.require('unique_key') %}
    {% set target_relation = this %}
    
    -- count distinct records  
    {% set query_all %}
        select count(*) from (
        select distinct * from ( {{sql}} ) );
    {% endset %}
    {% set result =  run_query(query_all) %}
    {% set row_count_all = result.columns[0].values()[0]%}    
    
    -- count distinct primary keys
    {% set query_pk %}
        select count({{unique_key}}) from (
        select distinct {{unique_key}} from ( {{sql}} ) );
    {% endset %}
    {% set result =  run_query(query_pk) %}
    {% set row_count_pk = result.columns[0].values()[0]%}    

    -- comparing distinct records and distinct primary keys
    -- if both counts are equal enter into the condition
    {%  if row_count_all == row_count_pk %}
        {% set build_sql %}
        {% set target_relation_org = adapter.get_relation(database=this.database,schema=this.schema,identifier=this.table) %}

        -- creating an empty target table on the first run
        {% if target_relation_org is none %}
                create or replace  table {{target_relation}} as (
                    {{sql}} where 1=2
                );
        {% endif %}

            -- inserting distinct records into the target when the condition satisfied 
            insert into {{target_relation}} (
                select distinct * from (
                    {{sql}}
                ) where {{unique_key}} not in ( select {{unique_key}} from {{target_relation}} )
            ) 
        {% endset %}

    -- else it raises an error    
    {% else %}
        {{ exceptions.raise_compiler_error("Duplicate row contains different data !") }}
    {% endif %}

    -- compiling sql using call statement
    {% call statement('main') %}
        {{build_sql}}
    {% endcall %}

    {{ return({'relations': [this]}) }}

{% endmaterialization%}
				
			

Other features used in materialization or macro:

Get relation:

In the default DBT incremental materialization, duplicate rows are permitted in the first run.

To overcome this, we develop our own materialization.

				
					{%- set source_relation = adapter.get_relation(
      database="database_name",
      schema="schema_name",
      identifier="table_name") -%}
				
			

Returns “database”. “schema”. “identifier” or None if the relation does not exist.

Identifier refers to the table name.

Get columns in relation:

				
					{%- set columns = adapter.get_columns_in_relation(this) -%}
				
			

Returns a list of Columns in a table.

Create schema:

				
					{% do adapter.create_schema(api.Relation.create(database=target.database, schema="my_schema")) %}

				
			

Creates a schema in the target database.

Drop schema:

				
					{% do adapter.drop_schema(api.Relation.create(database=target.database, schema="my_schema"))) %}

				
			

Drops a schema (or equivalent) in the target database.

Drop relation:

				
					{% do adapter.drop_relation(this) %}

				
			

The drop_relation method will remove the specified relation from DBT’s relation cache.

Share

Leave a Reply

Your email address will not be published. Required fields are marked *

Shahnewaz Khan

10 years of experience with BI and Analytics delivery.

Shahnewaz is a technically minded and accomplished Data management and technology leader with over 19 years’ experience in Data and Analytics.

Including;

  • Data Science
  • Strategic transformation
  • Delivery management
  • Data strategy
  • Artificial intelligence
  • Machine learning
  • Big data
  • Cloud transformation
  • Data governance. 


Highly skilled in developing and executing effective data strategies, conducting operational analysis, revamping technical systems, maintaining smooth workflow, operating model design and introducing change to organisational programmes. A proven leader with remarkable efficiency in building and leading cross-functional, cross-region teams & implementing training programmes for performance optimisation. 


Thiru Ps

Solution/ Data/ Technical / Cloud Architect

Thiru has 15+ years experience in the business intelligence community and has worked in a number of roles and environments that have positioned him to confidently speak about advancements in corporate strategy, analytics, data warehousing, and master data management. Thiru loves taking a leadership role in technology architecture always seeking to design solutions that meet operational requirements, leveraging existing operations, and innovating data integration and extraction solutions.

Thiru’s experience covers;

  • Database integration architecture
  • Big data
  • Hadoop
  • Software solutions
  • Data analysis, analytics, and quality. 
  • Global markets

 

In addition, Thiru is particularly equipped to handle global market shifts and technology advancements that often limit or paralyse corporations having worked in the US, Australia and India.