Snowflake’s Metadata-Driven Email and File Notification Solution

Harnessing the capabilities of Snowflake can lead to diverse avenues of functionality for data professionals. A notable feature is Snowflake's aptitude to send notifications, either directly via email or by generating files in cloud storage. Here's a walkthrough on setting up and utilizing this feature.

Configuring the Notification Integration

To initiate, a notification integration in Snowflake must be established:

                  
                    create or replace notification integration my_email
                    type=email
                    enabled=true
                    allowed_recipients=('user@example.com');
                      
                

In this example, we've christened our integration as `my_email`. It's paramount to remember that, for heightened security, Snowflake restricts email recipients to those registered with an active Snowflake account.

A few salient aspects of the Notification System include:

  • Emails propelled from the Notification System are transmitted through Snowflake's AWS foundation, capitalizing on AWS Simple Email Service (SES).
  • To ascertain efficient message delivery, the content of an email delivered via AWS is safeguarded by Snowflake for a duration of thirty days, post which it's eradicated.
  • Authenticating email addresses is crucial. Each address within ALLOWED_RECIPIENTS must be cross-verified by the Snowflake user it corresponds to. Non-adherence results in an abortive CREATE NOTIFICATION INTEGRATION command.
  • A limiting feature: each account can craft a maximum of ten notification integrations.

Crafting the Metadata Table

The linchpin of this solution is the metadata table, a structured blueprint that contains SQL commands awaiting execution and the accompanying email details.

                  
                    create or replace TABLE METADATA_EMAILNOTIFICATION(
                      ROWNUMBER NUMBER(38,0) identity(1,1) COMMENT 'KEY COLUMN RUNNING NUMBER',
                      SQLQUERY VARCHAR(100000) COLLATE 'en-ci',
                      FREQUENCY VARCHAR(50) COLLATE 'en-ci',--Weekly_5_16:30 (Weekly_=Weekly_,5=Dayof the week,16:60=hh:mm) OR Daily_16:30 (Daily_ = Daily_, 16:30 = hh:mm) OR Every_2 (Every_= Every_,2 = Duraton of exeuton)
                      EMAILTO VARCHAR(500) COLLATE 'en-ci',
                      SUBJECT VARCHAR(500)  COLLATE 'en-ci',  
                      ISACTIVE BOOLEAN,
                      LASTRUNDATETIME TIMESTAMP_NTZ(9),-- edn date time
                      LASTRUNSTATUS BOOLEAN,
                      LASTRUNERROR VARCHAR(1000) COLLATE 'en-ci',
                      LASTSTARTDATETIME VARCHAR(1000) COLLATE 'en-ci',
                      COLUMNNAME VARCHAR(1000) COLLATE 'en-ci',
                      IsRunning BOOLEAN,
                      Send_as_blob_file BOOLEAN
                  );
                    
                  

The table comprises pivotal columns:

  • SQLQUERY: Captures the SQL command to be triggered.
  • FREQUENCY: Dictates the rhythm of the SQL command's execution. For instance, Weekly_5_16:30 implies a Friday 4:30 PM execution.
  • EMAILTO, SUBJECT, BODY: These choreograph the email dynamics.
  • ISACTIVE: A binary indication of the row's active status.
  • LASTRUNDATETIME: A timestamp of the command's most recent execution.
  • LASTRUNSTATUS: Status post the latest run.
  • LASTRUNERROR: Logs any discrepancies from the recent execution.
  • SEND_AS_BLOB_FILE: This is pivotal. When marked as '1' or 'yes', the command yields a CSV file that is subsequently dispatched to Azure blob storage, harmonizing with integrated storage setups.

To populate this table, administrators can seamlessly slide in rows representing their desired queries:

                  
                    INSERT INTO METADATA_EMAILNOTIFICATION (SQLQUERY, FREQUENCY, EMAILTO, SUBJECT, ISACTIVE, COLUMNNAME,Send_as_blob_file)
                    VALUES
                    (
                      'SELECT top 5 ifnull(QUERY_ID,'''') as QUERY_ID, ifNULL(DATABASE_NAME,'''') as DATABASE_NAME, ifnull(USER_NAME,'''') as USER_NAME, ifnull(WAREHOUSE_SIZE,'''') as WAREHOUSE_SIZE, ifnull(QUERY_TYPE,'''') as QUERY_TYPE, ifnull(TOTAL_ELAPSED_TIME,0) as TOTAL_ELAPSED_TIME FROM snowflake.account_usage.QUERY_HISTORY',
                      'Every_2',
                      'user@example.com',
                      'Test Email 123...',  
                      1,
                      'QUERY_ID,DATABASE_NAME,USER_NAME,WAREHOUSE_SIZE,QUERY_TYPE',
                      1
                    );
                
                    INSERT INTO METADATA_EMAILNOTIFICATION (SQLQUERY, FREQUENCY, EMAILTO, SUBJECT, ISACTIVE, COLUMNNAME,Send_as_blob_file)
                    VALUES
                    (
                      'select top 10 start_time,end_time,warehouse_id,warehouse_name from SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY ',
                      'Every_1',
                      'user@example.com',
                      'Test Email 123 output as table...',  
                      1,
                      'start_time,end_time,warehouse_id,warehouse_name',
                      0
                    );
                  
                  

Utilizing Stored Procedures for Diverse Notifications

Two distinct stored procedures amplify the solution's prowess:

  1. `GETMETADATA_EMAILNOTIFICATION()`: This procedure vigilantly combs through the metadata table, zeroing in on SQL commands ripe for execution, based on a matrix of frequency, active status, and more.
  2. `emailnotification`: Post filtration by the first procedure, this is invoked. Depending on the `SEND_AS_BLOB_FILE` column's directive, it either directs the query's results as an email or crafts a CSV on Azure blob storage.

Here is the code for the procedures

                  
                    CREATE OR REPLACE PROCEDURE GETMETADATA_EMAILNOTIFICATION()
                    RETURNS VARCHAR(16777216)
                    LANGUAGE JAVASCRIPT
                    EXECUTE AS OWNER
                    AS $$
                    var rownum="",sqlquery="", columnname="", resultdata="",currenttime="",sql_updatestatement="",sqlupdatequery="",Send_as_blob_file="";
                    var errstr="";
                    resultdata="";
                    try
                    {
                         var sql_query = "SELECT ROWNUMBER,SQLQUERY,EMAILTO,SUBJECT,COLUMNNAME,Send_as_blob_file,FREQUENCY FROM METADATA_EMAILNOTIFICATION ";
                        sql_query+= " WHERE  ISACTIVE=1 and (IsRunning = 0 or IsRunning is null) And (( FREQUENCY like 'Weekly%'  AND ( LastStartDateTime IS NULL OR TO_DATE(LastStartDateTime) <> CURRENT_DATE()) ";
                        sql_query+= " And SUBSTRING(FREQUENCY,8,1) = TO_CHAR( DATE_PART(dw, CURRENT_TIMESTAMP()::DATE)) ";
                        sql_query+= " And SUBSTRING(FREQUENCY,10) <=  TO_CHAR(CURRENT_TIMESTAMP(), 'HH24:MI'))";
                        sql_query+= " Or ";
                        sql_query+= "  (( FREQUENCY like 'Daily%' ) AND ( LastStartDateTime IS NULL OR TO_DATE(LastStartDateTime) <> CURRENT_DATE()) And ";
                        sql_query+= " REPLACE(FREQUENCY,'Daily_','') <=  TO_CHAR(CURRENT_TIMESTAMP(),'HH24:MI')) ";
                        sql_query+= " Or ";
                        sql_query+= " (FREQUENCY LIKE 'Every_%' AND ( LastStartDateTime IS NULL OR ";  
                        sql_query+= " (REPLACE(FREQUENCY,'Every_','')*60) <=  TIMEDIFF(minute, LastStartDateTime, CURRENT_TIMESTAMP()))))" ;
                       
                        var sql_statement = snowflake.createStatement({sqlText: sql_query});    
                        var result_scan = sql_statement.execute();
                       
                        while (result_scan.next())
                        {
                            rownum=result_scan.getColumnValue(1);
                            sqlquery=result_scan.getColumnValue(2);
                            emails=result_scan.getColumnValue(3);
                            emailsubject=result_scan.getColumnValue(4);        
                            columnname=result_scan.getColumnValue(5);
                            Send_as_blob_file=result_scan.getColumnValue(6);
                                   
                            var sqlupdatequery_new = "update METADATA_EMAILNOTIFICATION set IsRunning =true,LastStartDateTime= CURRENT_TIMESTAMP() where rownumber="+rownum;        
                            var sql_updatestatement_new = snowflake.createStatement({sqlText:sqlupdatequery_new});
                            sql_updatestatement_new.execute();
                            var sqlquery_new = sqlquery.replaceAll("'","~~");
                            var call_sqlquery = "call emailnotification('"+ sqlquery_new+"','"+columnname+"','"+emails+"','"+ emailsubject+"',"+ Send_as_blob_file +")";         
                            var sql_statement_call = snowflake.createStatement({sqlText:call_sqlquery});
                            var resultdata = sql_statement_call.execute();
                           
                            resultdata.next();
                            result=resultdata.getColumnValue(1);          
                           
                            if(result.toLowerCase() =="success")
                            {                      
                                sqlupdatequery = "update METADATA_EMAILNOTIFICATION set LASTRUNDATETIME= CURRENT_TIMESTAMP(),lastrunerror='' ,LASTRUNSTATUS =true,IsRunning =false where rownumber="+rownum;        
                            }
                            else
                            {            
                                sqlupdatequery = "update METADATA_EMAILNOTIFICATION set LASTRUNDATETIME= CURRENT_TIMESTAMP() ,LASTRUNSTATUS =false ,IsRunning =false,lastrunerror='"+ result +"' ";
                                sqlupdatequery +="where rownumber="+rownum;        
                            }
                           
                            sql_updatestatement = snowflake.createStatement({sqlText:sqlupdatequery});
                            sql_updatestatement.execute();      
                           
                        }
                        return "success";
                    }
                    catch (err)
                    {  
                        errstr = "Failed: Code: " + err.code + "\\n  State: " + err.state;
                        errstr += "\\n  Message: " + err.message;
                        errstr += "\\nStack Trace:\\n" + err.stackTraceTxt;
                        sqlupdatequery = "update METADATA_EMAILNOTIFICATION set  LASTRUNSTATUS =false,IsRunning =false,lastrunerror='"+ errstr +"' where rownumber="+rownum;    
                        sql_updatestatement = snowflake.createStatement({sqlText:sqlupdatequery});
                        sql_updatestatement.execute();
                    }
                    $$;
                    
                  
                    
                      CREATE OR REPLACE PROCEDURE emailnotification(QUERY varchar(8000),COLUMNNAME VARCHAR(8000),EMAILS VARCHAR(1000),EMAILSUBJECT VARCHAR(1000),SEND_AS_BLOB_FILE BOOLEAN)
RETURNS VARCHAR(16777216)
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS $$
try
{      
    var resultdata="";  
    var QUERY = QUERY.replaceAll("~~","'");    
    var stmt = snowflake.createStatement({sqlText: QUERY});
    var resultSet = stmt.execute();    
 
    if(!resultSet.next())
    {
        return "record does not found";
    }    
    if(SEND_AS_BLOB_FILE == 1)
    {
        var today="",date="",time="",filename="",fileurl="";
        today = new Date();
        date = today.getFullYear()+'_'+(today.getMonth())+'_'+today.getDate();
        time = today.getHours()+'_'+today.getMinutes()+'_'+today.getSeconds();    
        filename = "METADATA_EMAILNOTIFICATION"+ date +"_"+ time +".csv";    
        fileurl="https://myfiles.blob.core.windows.net/snowflakefiles/"+filename;
        
        var copy_command_query ="COPY INTO @my_azure_stage/"+ filename +" from ";
        copy_command_query +=" ("+ QUERY +")";
        copy_command_query +=" HEADER=TRUE";            
        copy_command_query +=" SINGLE = TRUE";
        copy_command_query +=" OVERWRITE =TRUE;"
    
        var sql_statement_copy = snowflake.createStatement({sqlText:copy_command_query});
        sql_statement_copy.execute();
    
        resultdata = "Output is available in the link below.

URL : " +fileurl ; } else { const myArray = COLUMNNAME.split(","); //var columncount= resultSet.getColumnCount(); resultdata='' resultdata+='' for (var j = 0; j < myArray.length; j++) { resultdata+=''; } resultdata+='' for (var i = 1; i <= stmt.getRowCount(); i++) { resultSet.next() resultdata+='' for (var j = 0; j < myArray.length; j++) { var str = resultSet.getColumnValue(j+1); resultdata+=''; } resultdata+=''; resultSet.next(); } resultdata+="
'+ myArray[j] +'
'+ str+'
" } if(resultdata!="") { sqlquery = "call system$send_email('my_email','"+EMAILS+"','"+EMAILSUBJECT+"','"+ resultdata+"','text/html')"; sql_statement = snowflake.createStatement({sqlText:sqlquery}); sql_statement.execute(); } return "success"; } catch(err) { var errstr; errstr = "Failed: Code: " + err.code + "\\n State: " + err.state; errstr += "\\n Message: " + err.message; errstr += "\\nStack Trace:\\n" + err.stackTraceTxt; return errstr; }$$;

Scheduling via Snowflake Tasks

For automating the entire suite, Snowflake tasks are formulated to function at sixty-second intervals:

                  
                    create or replace task my_job_task
                    warehouse = mywh
                    schedule = '1 minute'
                  as
                    call GETMETADATA_EMAILNOTIFICATION();
                  
                    
                  

Though minutely in its run, the GETMETADATA_EMAILNOTIFICATION() procedure showcases discernment, selectively picking rows that meet specific prerequisites, optimizing system efficiency.

Empowering Modern Data Teams

The bevy of advantages is hard to overlook:

  • System administrators gain insights on account engagement or can fetch granular reports on extended query sessions.
  • Governance teams can swiftly pinpoint non-standard joins and circulate alerts.
  • Engineers can implement regular data quality checks.
  • Adaptability is a highlight. Beyond emails, it’s feasible to adapt this model for tasks such as curtailing extensive queries or orchestrating data purges.

By blending Snowflake's innate potential with a thoughtfully structured solution, data maestros can helm an enhanced command over their data sphere.