3 minute read

I just would like to have DMARC report into 1 folder, and make them packed into CSV, as popular DMARC processing SaaS is a little bit costy.

code it into dmarc.py

import os
import gzip
import zipfile
from lxml import etree
import csv
import datetime

def extract_info_from_filename(filename):
    """
    Extracts start and end datetimes from a given filename.
    Args:
    - filename (str): The filename containing Unix timestamps for start and end times.
    - filename expecting..: 'google.com!domainname!1707004800!1707091199.xml'
    Returns:
    - tuple: A tuple containing start and end datetime objects.
    """
    # Assuming filenames are like report_starttime_endtime.xml
    # Extract timestamps from filename
    parts = filename.replace(".gz", "").replace(".xml", "").split("!")
    if len(parts) >= 4:
        to_domain, from_domain, start_timestamp, end_timestamp, *other_abysss = parts
        try:
            start_datetime = datetime.datetime.fromtimestamp(int(start_timestamp))
            end_datetime   = datetime.datetime.fromtimestamp(int(end_timestamp))
        except ValueError:
            start_datetime = datetime.datetime.fromtimestamp(1)
            end_datetime   = datetime.datetime.fromtimestamp(2)
    else:
        to_domain          = 'to.example.com'
        from_domain        = 'from.example.com'
        start_datetime     = datetime.datetime.fromtimestamp(1)
        end_datetime       = datetime.datetime.fromtimestamp(2)
    return to_domain, from_domain, start_datetime, end_datetime

def extract_and_parse_files(directory, output_csv):
    # Open or create the CSV file for output
    with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
        csvwriter = csv.writer(csvfile)
        # Write the CSV header
        csvwriter.writerow(['Filename', 'src', 'count', 'Header-From Domain', 'SPF Domnain', 'SPF check', 'SPF alignment', 'DKIM Domain', 'DKIM check', 'DKIM alignment' , 'Detail', 'DMARC disposition', 'To', 'From', 'Start', 'End'])
        # Process each file in the directory
        for root, dirs, files in os.walk(directory):
            for file in files:
                file_path = os.path.join(root, file)
                # Check if the file is a .gz or .zip file and extract XML content
                if file.endswith('.gz'):
                    with gzip.open(file_path, 'rb') as f:
                        xml_content = f.read()
                        parse_xml_content(xml_content, file, csvwriter)
                elif file.endswith('.zip'):
                    with zipfile.ZipFile(file_path, 'r') as zip_ref:
                        for name in zip_ref.namelist():
                            with zip_ref.open(name) as f:
                                xml_content = f.read()
                                parse_xml_content(xml_content, name, csvwriter)

def parse_xml_content(xml_content, filename, csvwriter):
    # Parse the XML content
    root = etree.fromstring(xml_content)
    # Iterate through the records in the XML
    for record in root.findall('.//record'):
        row              = record.find('.//row')
        source_ip           = row.find('.//source_ip').text
        count               = row.find('.//count').text
        policy_evaluated    = row.find('.//policy_evaluated')
        identifiers      = record.find('.//identifiers')
        header_from         = identifiers.find('.//header_from').text if identifiers is not None else 'Unknown'
        # DMARC evaluation
        dmarc_fail = policy_evaluated.find('.//disposition').text
        # SPF evaluation
        spf_result       = record.find('.//auth_results//spf//result').text if record.find('.//auth_results//spf//result') is not None else 'n/a'
        spf_fail         = spf_result
        spf_domain       = record.find('.//auth_results//spf//domain').text if record.find('.//auth_results//spf//domain') is not None else 'n/a'
        spf_aligned      = 'pass' if spf_domain == header_from else 'fail'

        # # DKIM evaluation
        # dkim_result      = record.find('.//auth_results//dkim//result').text if record.find('.//auth_results//dkim//result') is not None else 'n/a'
        # dkim_fail        = dkim_result
        # dkim_domain      = record.find('.//auth_results//dkim//domain').text if record.find('.//auth_results//dkim//domain') is not None else 'n/a'
        # dkim_aligned     = 'pass' if dkim_domain == header_from else 'fail'

        dkim_detail  = ''
        dkim_fail    = ''
        dkim_domain  = ''
        dkim_aligned = ''
        for dkim in record.findall('.//auth_results//dkim'):
            d_result = dkim.find('.//result').text if dkim.find('.//result') is not None else 'none'
            d_domain = dkim.find('.//domain').text if dkim.find('.//domain') is not None else 'none'
            d_aligned= 'pass' if d_domain == header_from else 'fail'
            dkim_detail +=  d_result + ';' + d_domain + ';' + d_aligned + ';  '
            if dkim_fail == '' and dkim_domain == '' and dkim_aligned == '':
                dkim_fail    = d_result
                dkim_domain  = d_domain
                dkim_aligned = d_aligned
            else:
                if d_result == 'pass' and d_aligned == 'pass':
                    dkim_fail    = d_result
                    dkim_domain  = d_domain
                    dkim_aligned = d_aligned
            if d_aligned == 'fail':
                dkim_detail += '\n' + etree.tostring(record, pretty_print=True).decode() + '\n'

        # info from filename
        to_domain, from_domain, start_datetime, end_datetime = extract_info_from_filename(filename)

        # Write the record to the CSV
        csvwriter.writerow(['\'' + filename + '\'' , source_ip, count, header_from, spf_domain , spf_fail, spf_aligned, dkim_domain ,  dkim_fail, dkim_aligned , dkim_detail, dmarc_fail, to_domain, from_domain, start_datetime, end_datetime])
def main():
    directory = 'dmarc-reps'  # Update this path to where your DMARC reports are stored
    output_csv = 'dmarc_results.csv'  # Output CSV file name
    extract_and_parse_files(directory, output_csv)
if __name__ == "__main__":
    main()

How to use

install python

sudo apt install python3 python-is-python3
pip install gzip zipfile lxml
code dmarc.py
mkdir dmarc-reps

Place gz or zip files into dmarc-reps folder, upon creation of dmarc.oy and dmarc-reps folder.

Run it and open dmarc_result.csv

python dmarc.py
code dmarc_result.csv

It looks like this, and you can check DMARC pass or not, checking either of SPF/SPF-alignment or DKIM/DKIM-alignment pass or not. image

Enjoy! and if you have questions, please DM on https://twitter.com/rtree

Updated: