# coding=utf-8
"""Helper functions for parsing csv reports."""
import csv
EXPECTED_DEPLOYMENTS_REPORT_ID_FIELDS = (
"Report ID",
"Report Type",
"Report Version",
"Report Platform ID",
)
EXPECTED_DETAIL_REPORT_ID_FIELDS = EXPECTED_DEPLOYMENTS_REPORT_ID_FIELDS + ("Number Sources",)
[docs]def normalize_csv_report(f, header_range, header_lines, report_type="deployments"):
"""Extract and normalize csv report to match the returned JSON report.
:param f: A file object for the csv
:param header_range: An int specifing the range that the head extends into
the csv file.
:param header_lines: A list of tuples containg the index pairs for the
lines to be used as the key/value pairs of each header info dictionary
(ex: [(0,1), (3,4)] ).
:param report_type: A string that defines what type of report object to
return, 'deployments' or 'detail'. Defaults to 'deployments'.
"""
# First grab the header information from the file object.
report_headers = [f.readline() for _ in range(header_range)]
# Next, extract header information into a list of dictonaries (1 dictionary
# per header set)
header_info = extract_key_value_lines(report_headers, header_lines)
# Now that we extracted the report information we can use CSV reader to
# read the system fingerprints information
reader = csv.DictReader(f)
if report_type == "deployments":
return normalize_deployments_report(header_info, reader)
else:
return normalize_detail_report(header_info, reader)
[docs]def normalize_deployments_report(header_info, reader):
"""Normalize report info into a deployments report.
Takes information from report_info dict, and reader and returns a
deployments report.
:param header_info: A list of dictonaries, which each dictonary containing
the information of a header.
:param reader: A DictReader containing the parsed content (remainder after
header section) of a report.
"""
report_info = header_info[0]
# Ensure extracted fields match expected
expected_keys = [x.lower().replace(" ", "_") for x in EXPECTED_DEPLOYMENTS_REPORT_ID_FIELDS]
assert sorted(report_info.keys()) == sorted(
expected_keys
), "Extracted Report Fields didn't match expected list"
report = {
"report_id": report_info["report_id"],
"report_type": report_info["report_type"],
"report_version": report_info["report_version"],
"report_platform_id": report_info["report_platform_id"],
"system_fingerprints": [row for row in reader],
}
return report
[docs]def normalize_detail_report(header_info, reader):
"""Normalize report info into a detail report.
Takes information from report_info dict, and reader and returns a
detail format report
:param header_info: A list of dictonaries, which each dictonary containing
the information of a header.
:param reader: A DictReader containing the parsed content (remainder after
header section) of a report.
"""
# The first dictionary grabbed contains the report info
report_info = header_info[0]
# The second contains the source header info
source_info = header_info[1]
# Ensure extracted fields match expected
expected_keys = [x.lower().replace(" ", "_") for x in EXPECTED_DETAIL_REPORT_ID_FIELDS]
assert sorted(report_info.keys()) == sorted(
expected_keys
), "Extracted Report Fields didn't match expected list"
report = {
"id": report_info["report_id"],
"report_type": report_info["report_type"],
"sources": [
{
"facts": [row for row in reader],
"server_id": source_info["server_identifier"],
"source_name": source_info["source_name"],
"source_type": source_info["source_type"],
}
],
}
return report
[docs]def zip_line_pairs(input_lines, key_ind, value_ind, delim=","):
"""Take a list of csv strings, and combine 2 of them into a dictionary.
:param input_lines: a list of csv lines
:param key_ind: the index value for which line in ``input_lines`` should be
used for the keys of the dictionary.
:param value_ind: the index value for which line in ``input_lines`` should
be used for the values of the dictionary.
:param delim: the delimiter to split the lines with (defaults to ',').
"""
header_keys = input_lines[key_ind].strip().split(delim)
header_values = input_lines[value_ind].strip().split(delim)
# Dynamically zip the header items into a dictionary.
return {key.lower().replace(" ", "_"): value for key, value in zip(header_keys, header_values)}