Source code for hatchet.readers.caliper_reader

# Copyright 2017-2020 Lawrence Livermore National Security, LLC and other
# Hatchet Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

import json
import sys
import re
import subprocess
import os

import pandas as pd

import hatchet.graphframe
from hatchet.node import Node
from hatchet.graph import Graph
from hatchet.frame import Frame
from hatchet.util.timer import Timer
from hatchet.util.executable import which


[docs]class CaliperReader: """Read in a Caliper file (`cali` or split JSON) or file-like object.""" def __init__(self, filename_or_stream, query=""): """Read from Caliper files (`cali` or split JSON). Args: filename_or_stream (str or file-like): name of a `cali` or `cali-query` split JSON file, OR an open file object query (str): cali-query arguments (for cali file) """ self.filename_or_stream = filename_or_stream self.filename_ext = "" self.query = query self.json_data = {} self.json_cols = {} self.json_cols_mdata = {} self.json_nodes = {} self.idx_to_label = {} self.idx_to_node = {} self.timer = Timer() self.nid_col_name = "nid" if isinstance(self.filename_or_stream, str): _, self.filename_ext = os.path.splitext(filename_or_stream)
[docs] def read_json_sections(self): # if cali-query exists, extract data from .cali to a file-like object if self.filename_ext == ".cali": cali_query = which("cali-query") if not cali_query: raise ValueError("from_caliper() needs cali-query to query .cali file") cali_json = subprocess.Popen( [cali_query, "-q", self.query, self.filename_or_stream], stdout=subprocess.PIPE, ) self.filename_or_stream = cali_json.stdout # if filename_or_stream is a str, then open the file, otherwise # directly load the file-like object if isinstance(self.filename_or_stream, str): with open(self.filename_or_stream) as cali_json: json_obj = json.load(cali_json) else: json_obj = json.loads(self.filename_or_stream.read().decode("utf-8")) # read various sections of the Caliper JSON file self.json_data = json_obj["data"] self.json_cols = json_obj["columns"] self.json_cols_mdata = json_obj["column_metadata"] self.json_nodes = json_obj["nodes"] # decide which column to use as the primary path hierarchy # first preference to callpath if available if "source.function#callpath.address" in self.json_cols: self.path_col_name = "source.function#callpath.address" self.node_type = "function" elif "path" in self.json_cols: self.path_col_name = "path" self.node_type = "region" else: sys.exit("No hierarchy column in input file") # remove data entries containing None in `path` column (null in json file) # first, get column where `path` data is # then, parse json_data list of lists to identify lists containing None in # `path` column path_col = self.json_cols.index(self.path_col_name) entries_to_remove = [] for sublist in self.json_data: if sublist[path_col] is None: entries_to_remove.append(sublist) # then, remove them from the json_data list for i in entries_to_remove: self.json_data.remove(i) # change column names for idx, item in enumerate(self.json_cols): if item == self.path_col_name: # this column is just a pointer into the nodes section self.json_cols[idx] = self.nid_col_name # make other columns consistent with other readers if item == "mpi.rank": self.json_cols[idx] = "rank" if item == "module#cali.sampler.pc": self.json_cols[idx] = "module" if item == "sum#time.duration" or item == "sum#avg#sum#time.duration": self.json_cols[idx] = "time" if ( item == "inclusive#sum#time.duration" or item == "sum#avg#inclusive#sum#time.duration" ): self.json_cols[idx] = "time (inc)" # make list of metric columns self.metric_columns = [] for idx, item in enumerate(self.json_cols_mdata): if self.json_cols[idx] != "rank" and item["is_value"] is True: self.metric_columns.append(self.json_cols[idx])
[docs] def create_graph(self): list_roots = [] # find nodes in the nodes section that represent the path hierarchy for idx, node in enumerate(self.json_nodes): node_label = node["label"] self.idx_to_label[idx] = node_label if node["column"] == self.path_col_name: if "parent" not in node: # since this node does not have a parent, this is a root graph_root = Node( Frame({"type": self.node_type, "name": node_label}), None ) list_roots.append(graph_root) node_dict = { self.nid_col_name: idx, "name": node_label, "node": graph_root, } self.idx_to_node[idx] = node_dict else: parent_hnode = (self.idx_to_node[node["parent"]])["node"] hnode = Node( Frame({"type": self.node_type, "name": node_label}), parent_hnode, ) parent_hnode.add_child(hnode) node_dict = { self.nid_col_name: idx, "name": node_label, "node": hnode, } self.idx_to_node[idx] = node_dict return list_roots
[docs] def read(self): """Read the caliper JSON file to extract the calling context tree.""" with self.timer.phase("read json"): self.read_json_sections() with self.timer.phase("graph construction"): list_roots = self.create_graph() # create a dataframe of metrics from the data section self.df_json_data = pd.DataFrame(self.json_data, columns=self.json_cols) # map non-numeric columns to their mappings in the nodes section for idx, item in enumerate(self.json_cols_mdata): if item["is_value"] is False and self.json_cols[idx] != self.nid_col_name: if self.json_cols[idx] == "sourceloc#cali.sampler.pc": # split source file and line number into two columns self.df_json_data["file"] = self.df_json_data[ self.json_cols[idx] ].apply( lambda x: re.match( r"(.*):(\d+)", self.json_nodes[x]["label"] ).group(1) ) self.df_json_data["line"] = self.df_json_data[ self.json_cols[idx] ].apply( lambda x: re.match( r"(.*):(\d+)", self.json_nodes[x]["label"] ).group(2) ) self.df_json_data.drop(self.json_cols[idx], axis=1, inplace=True) sourceloc_idx = idx else: self.df_json_data[self.json_cols[idx]] = self.df_json_data[ self.json_cols[idx] ].apply(lambda x: self.json_nodes[x]["label"]) # since we split sourceloc, we should update json_cols and # json_cols_mdata if "sourceloc#cali.sampler.pc" in self.json_cols: self.json_cols.pop(sourceloc_idx) self.json_cols_mdata.pop(sourceloc_idx) self.json_cols.append("file") self.json_cols.append("line") self.json_cols_mdata.append({"is_value": False}) self.json_cols_mdata.append({"is_value": False}) max_nid = self.df_json_data[self.nid_col_name].max() if "line" in self.df_json_data.columns: # split nodes that have multiple file:line numbers to have a child # each with a unique file:line number unique_nodes = self.df_json_data.groupby(self.nid_col_name) df_concat = [self.df_json_data] for nid, super_node in unique_nodes: line_groups = super_node.groupby("line") # only need to do something if there are more than one # file:line number entries for the node if len(line_groups.size()) > 1: sn_hnode = self.idx_to_node[nid]["node"] for line, line_group in line_groups: # create the node label file_path = (line_group.head(1))["file"].item() file_name = os.path.basename(file_path) node_label = file_name + ":" + line # create a new hatchet node max_nid += 1 idx = max_nid hnode = Node( Frame( {"type": "statement", "file": file_path, "line": line} ), sn_hnode, ) sn_hnode.add_child(hnode) node_dict = { self.nid_col_name: idx, "name": node_label, "node": hnode, } self.idx_to_node[idx] = node_dict # change nid of the original node to new node in place for index, row in line_group.iterrows(): self.df_json_data.loc[index, "nid"] = max_nid # add new row for original node node_copy = super_node.head(1).copy() for cols in self.metric_columns: node_copy[cols] = 0 df_concat.append(node_copy) # concatenate all the newly created dataframes with # self.df_json_data self.df_fixed_data = pd.concat(df_concat) else: self.df_fixed_data = self.df_json_data # create a dataframe with all nodes in the call graph self.df_nodes = pd.DataFrame.from_dict(data=list(self.idx_to_node.values())) # add missing intermediate nodes to the df_fixed_data dataframe if "rank" in self.json_cols: self.num_ranks = self.df_fixed_data["rank"].max() + 1 rank_list = range(0, self.num_ranks) # create a standard dict to be used for filling all missing rows default_metric_dict = {} for idx, item in enumerate(self.json_cols_mdata): if self.json_cols[idx] != self.nid_col_name: if item["is_value"] is True: default_metric_dict[self.json_cols[idx]] = 0 else: default_metric_dict[self.json_cols[idx]] = None # create a list of dicts, one dict for each missing row missing_nodes = [] for iteridx, row in self.df_nodes.iterrows(): # check if df_nodes row exists in df_fixed_data metric_rows = self.df_fixed_data.loc[ self.df_fixed_data[self.nid_col_name] == row[self.nid_col_name] ] if "rank" not in self.json_cols: if metric_rows.empty: # add a single row node_dict = dict(default_metric_dict) node_dict[self.nid_col_name] = row[self.nid_col_name] missing_nodes.append(node_dict) else: if metric_rows.empty: # add a row per MPI rank for rank in rank_list: node_dict = dict(default_metric_dict) node_dict[self.nid_col_name] = row[self.nid_col_name] node_dict["rank"] = rank missing_nodes.append(node_dict) elif len(metric_rows) < self.num_ranks: # add a row for each missing MPI rank present_ranks = metric_rows["rank"].values missing_ranks = [x for x in rank_list if x not in present_ranks] for rank in missing_ranks: node_dict = dict(default_metric_dict) node_dict[self.nid_col_name] = row[self.nid_col_name] node_dict["rank"] = rank missing_nodes.append(node_dict) self.df_missing = pd.DataFrame.from_dict(data=missing_nodes) self.df_metrics = pd.concat([self.df_fixed_data, self.df_missing]) # create a graph object once all the nodes have been added graph = Graph(list_roots) graph.enumerate_traverse() # merge the metrics and node dataframes on the idx column with self.timer.phase("data frame"): dataframe = pd.merge(self.df_metrics, self.df_nodes, on=self.nid_col_name) # set the index to be a MultiIndex indices = ["node"] if "rank" in self.json_cols: indices.append("rank") dataframe.set_index(indices, inplace=True) dataframe.sort_index(inplace=True) # create list of exclusive and inclusive metric columns exc_metrics = [] inc_metrics = [] for column in self.metric_columns: if "(inc)" in column: inc_metrics.append(column) else: exc_metrics.append(column) return hatchet.graphframe.GraphFrame(graph, dataframe, exc_metrics, inc_metrics)