#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# seniority_list is an analytical tool used when seniority-based work
# groups merge. It brings modern data science to the area of labor
# integration, utilizing the powerful data analysis capabilities of Python
# scientific computing.
# Copyright (C) 2016-2017 Robert E. Davison, Ruby Data Systems Inc.
# Please direct consulting inquires to: rubydatasystems@fastmail.net
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
.. module:: list_builder
:synopsis: The list_builder module contains routines to build list
orderings from the master list data as a starting point for further
analysis and/or list editing. Lists may be built by various weighting
and sorting methods.
Typical workflow:
prepare_master_list - add columns to master list which can be used as
hybrid list factors. These columns are longevity, job, and percentage
related.
build_list - select, apply weighting, organize and sort a "hybrid" list.
Note: the sort_eg_attributes and sort_and_rank functions are helper
functions which may be used as standalone functions as well.
sort_eg_attributes - normally used within the prepare_master_list function.
Sort date-type attributes by employee group to form a chronological order
within each group without disturbing other columns order. (also works with
any other attribute if needed). Typical date columns to prepare in this
manner would be doh and ldate.
The sort_and_rank is a helper function for the build_list function.
The build_list function stores a pickle file that can then be used as an
input to the compute_measures script.
Example:
.. code:: python
%run compute_measures.py hybrid
.. moduleauthor:: Bob Davison <rubydatasystems@fastmail.net>
'''
import pandas as pd
import numpy as np
import functions as f
import warnings
[docs]def prepare_master_list(name_int_demo=False,
pre_sort=True):
'''Add attribute columns to a master list. One or more of these columns
will be used by the build_list function to construct
a "hybrid" list ordering.
Employee groups must be listed in seniority order in relation to employees
from the same group. Order between groups is uninmportant at this step.
New columns added: ['age', 's_lmonths', 'jnum', 'job_count', 'rank_in_job',
'jobp', 'eg_number', 'eg_spcnt']
inputs
name_int_demo
if True, lname strings are converted to an integer then a
corresponding alpha-numeric percentage for constructing lists by
last name. This is a demo only to show that any attribute
may be used as a list weighting factor.
pre_sort
sort the master data dataframe doh and ldate columns prior to
beginning any calculations. This sort has no effect on the other
columns. The s_lmonths coulumn will be calculated on the sorted
ldate data.
Job-related attributes are referenced to job counts from the settings
dictionary.
'''
master_ = pd.read_pickle('dill/master.pkl')
if pre_sort:
sort_eg_attributes(master_)
master = master_[(master_.line == 1) | (master_.fur == 1)].copy()
sdict = pd.read_pickle('dill/dict_settings.pkl')
# AGE and LONGEVITY
master['age'] = f.starting_age(master.retdate, sdict['starting_date'])
master['s_lmonths'] = f.longevity_at_startdate(list(master['ldate'],),
sdict['starting_date'],
return_as_months=True)
jobs_list = []
if sdict['enhanced_jobs']:
# use job dictionary(jd) from settings dictionary
eg_counts, j_changes = f.convert_to_enhanced(sdict['eg_counts'],
sdict['j_changes'],
sdict['jd'])
else:
eg_counts = sdict['eg_counts']
# make a list of stovepipe jobs for each group (from settings dictionary
# job counts)
i = 1
for jobs in eg_counts:
# the second input determines the length of the zero
# array formed (possible excess)
jobs_list.append(
f.make_stovepipe_jobs_from_jobs_arr(jobs,
sum((master.eg == i) &
((master.line == 1) |
(master.fur == 1)))))
i += 1
fur_level = f.max_of_nested_lists(jobs_list) + 1
jobs = np.array(jobs_list)
# mark unassigned as furloughed (from zero to fur_level)
for job_arr in jobs:
np.put(job_arr, np.where(job_arr == 0)[0], fur_level)
egs = master.eg.values
jnums = np.zeros(egs.size)
job_count = np.zeros(egs.size)
# JNUM and JOB_COUNT data prep
i = 1
for job_arr in jobs:
data = np.unique(job_arr, return_counts=True)
zipped = zip(data[0], data[1])
for job, count in zipped:
np.put(job_count,
np.where((jnums == 0) & (egs == i))[0][:count],
count)
np.put(jnums, np.where((jnums == 0) & (egs == i))[0][:count], job)
i += 1
# Employee group count (for spcnt column)
eg_counts = np.zeros(egs.size)
data = np.unique(master.eg, return_counts=True)
zipped = zip(data[0], data[1])
for eg, count in zipped:
np.put(eg_counts, np.where(egs == eg)[0], count)
# Attribute columns assignment
master['jnum'] = jnums.astype(int)
master['job_count'] = job_count.astype(int)
master['rank_in_job'] = master.groupby(['eg', 'jnum']).cumcount() + 1
master['jobp'] = (master.rank_in_job /
master.job_count) + master.jnum - .0001
master['eg_number'] = master.groupby('eg').cumcount() + 1
master['eg_count'] = eg_counts.astype(int)
master['eg_spcnt'] = master.eg_number / master.eg_count
if name_int_demo:
master['name_int'] = names_to_integers(master.lname)[2]
master.pop('eg_count')
return master
[docs]def build_list(df,
measure_list,
weight_list,
show_weightings=False,
hide_rank_cols=True,
return_df=False):
'''Construct a "hybrid" list ordering.
Note: first run the "prepare_master_list" function and use the output
for the "df" input here.
Combine and sort various attributes according to variable multipliers to
produce a list order. The list order output is based on a sliding scale
of the priority assigned amoung the attributes.
The default output is a dataframe containing the new hybrid list order
and employee numbers (empkey) only, and is written to disk as
'dill/p_hybrid.pkl'.
The entire hybrid-sorted dataframe may be returned by setting the
"return_df" input to True. This does not affect the hybrid list order
dataframe - it is produced and stored regardless of the "return_df"
option.
inputs
df
the prepared dataframe output of the prepare_master_list function
measure_list
a list of attributes that form the basis of the final sorted list.
The employee groups will be combined, sorted, and numbered
according to these attributes one by one. Each time the current
attribute numbered list is formed, a weighting is applied to that
order column. The final result number will be the rank of the
cummulative total of the weighted attribute columns.
weight_list
a list of decimal weightings to apply to each corresponding
measure within the measure_list. Normally the total of the
weight_list should be 1, but any numbers may be used as weightings
since the final result is a ranking of a cumulative total.
show_weightings
add columns to display the product of the weight/column
mutiplcation
return_df
option to return the new sorted hybrid dataframe as output.
Normally, the function produces a list ordering file which is
written to disk and used as an input by the compute measures
script.
hide_rank_cols
remove the attrubute rank columns from the dataframe unless
visual review is desired
'''
# options TODO: (for developer)
# , absolute=True,
# invert=False, include_inactives=False, include_fur=True,
# cut=False, qcut=False, remove_retired=True):
#
# The attribute values from the employee groups may be evenly ratioed
# together or combined on an absolute basis where the actual values
# determine the positioning.
df = df.copy()
df['hybrid'] = 0
for i in np.arange(len(measure_list)):
if show_weightings:
sort_and_rank(df, measure_list[i])
df[measure_list[i] + '_wgt'] = \
df[measure_list[i] + '_rank'] * weight_list[i]
df['hybrid'] += df[measure_list[i] + '_wgt']
else:
sort_and_rank(df, measure_list[i])
hybrid = np.array(df[measure_list[i] + '_rank'] * weight_list[i])
df['hybrid'] += hybrid
df = sort_and_rank(df, 'hybrid')
if hide_rank_cols:
for measure in measure_list:
df.pop(measure + '_rank')
df['idx'] = df.hybrid_rank
df.pop('hybrid_rank')
else:
df['idx'] = np.arange(len(df), dtype=int) + 1
df.set_index('empkey', drop=True, inplace=True)
df.idx = df.idx.astype(int)
df[['idx']].to_pickle('dill/p_hybrid.pkl')
if return_df:
cols = df.columns.tolist()
cols.insert(0, cols.pop(cols.index('idx')))
df = df.reindex(columns=cols)
return df
[docs]def sort_eg_attributes(df, attributes=['doh', 'ldate'],
reverse_list=[0, 0],
add_columns=False):
'''Sort master list attribute columns by employee group in preparation
for list construction. The overall master list structure and order is
unaffected, only the selected attribute columns are sorted (normally
date-related columns such as doh or ldate)
inputs
df
The master data dataframe (does not need to be sorted)
attributes
columns to sort by eg (inplace)
reverse_list
If an attribute is to be sorted in reverse order (descending),
use a '1' in the list position corresponding to the position of
the attribute within the attributes input
add_columns
If True, an additional column for each sorted attribute will be
added to the resultant dataframe, with the suffix '_sort' added
to it.
'''
date_cols = []
for col in df:
if (df[col]).dtype == 'datetime64[ns]':
date_cols.append(col)
try:
df.sort_values(['eg', 'eg_number'], inplace=True)
except LookupError:
df.sort_values(['eg', 'eg_order'], inplace=True)
egs = df.eg.values
i = 0
for measure in attributes:
data = df[measure].values
measure_col = np.empty_like(data)
for eg in pd.unique(df.eg):
measure_slice = data[egs == eg]
measure_slice_index = np.where(egs == eg)[0]
measure_slice_sorted = np.sort(measure_slice, axis=0)
if reverse_list[i]:
measure_slice_invert = measure_slice_sorted[::-1]
measure_slice_sorted = measure_slice_invert
np.put(measure_col, measure_slice_index, measure_slice_sorted)
if add_columns:
col_name = measure + '_sort'
else:
col_name = measure
df[col_name] = measure_col
if measure in date_cols:
df[col_name] = pd.to_datetime(df[col_name].dt.date)
i += 1
return df
[docs]def sort_and_rank(df,
col,
tiebreaker1=None,
tiebreaker2=None,
reverse=False):
'''Sort a datframe by a specified attribute and insert a column indicating
the resultant ranking. Tiebreaker inputs select columns to be used for
secondary ordering in the event of value ties. Reverse ordering may be
selected as an option.
inputs
df
input dataframe
col (string)
dataframe column to sort
tiebreaker1, tiebreaker2 (string(s))
second and third sort columns to break ties with primary col sort
reverse (boolean)
If True, reverses sort (descending values)
'''
col_list = [col]
if tiebreaker1:
col_list.append(tiebreaker1)
if tiebreaker2:
col_list.append(tiebreaker2)
if not reverse:
df.sort_values(col_list, inplace=True)
else:
df.sort_values(col_list, ascending=False, inplace=True)
df[col + '_rank'] = np.arange(len(df), dtype=float) + 1
return df
[docs]def names_to_integers(names,
leading_precision=5,
normalize_alpha=True):
'''convert a list or series of string names (i.e. last names) into integers
for numerical sorting
Returns tuple (int_names, int_range, name_percentages)
inputs
names
List or pandas series containing strings for conversion to integers
leading_precision
Number of characters to use with full numeric precision, remainder
of characters will be assigned a rounded single digit between
0 and 9
normalize_alpha
If True, insert 'aaaaaaaaaa' and 'zzzzzzzzzz' as bottom and
top names. Otherwise, bottom and top names will be calculated
from within the names input
output
1. an array of the name integers
2. the range of the name integers,
3. an array of corresponding percentages for each name integer
relative to the range of name integers array
Note: This function demonstrates the possibility of constructing
a list using any type or combination of attributes.
'''
if type(names) == pd.core.series.Series:
names = list(names.str.lower())
else:
names = list(pd.Series(names).str.lower())
if normalize_alpha:
names.extend(['aaaaaaaaaa', 'zzzzzzzzzz'])
int_names = np.zeros_like(names)
max_str_len = len(max(names, key=len))
alpha_numer = {'a': '01', 'b': '04', 'c': '08', 'd': '12', 'e': '16',
'f': '20', 'g': '24', 'h': '28', 'i': '32', 'j': '36',
'k': '40', 'l': '44', 'm': '48', 'n': '52', 'o': '56',
'p': '60', 'q': '64', 'r': '68', 's': '72', 't': '76',
'u': '80', 'v': '83', 'w': '87', 'x': '91', 'y': '95',
'z': '99'}
j = 0
for name in names:
num_convert = ''
for i in np.arange(max_str_len):
if i < leading_precision:
try:
num_convert += alpha_numer[name[i]]
except:
num_convert += '00'
else:
try:
num_convert += str(int(int(alpha_numer[name[i]]) * .1))
except:
num_convert += '0'
num_convert = int(num_convert)
int_names[j] = num_convert
j += 1
int_names = int_names.astype(float)
name_min = np.amin(int_names)
name_max = np.amax(int_names)
int_range = name_max - name_min
name_percentages = (int_names - name_min) / int_range
if normalize_alpha:
int_names = int_names[:-2]
name_percentages = name_percentages[:-2]
return int_names, int_range, name_percentages
[docs]def find_row_orphans(base_df,
compare_df,
col,
ignore_case=True,
print_output=False):
'''Given two columns (series) with the same column label in separate pandas
dataframes, return values which are unique to one or the other column,
not common to both series.
Will also work with dataframe indexes.
Returns tuple (base_loners, compare_loners) if not print_output. These are
dataframes with the series orphans.
Note: If there are orphans found that have identical values, they will
both be reported. However, currently the routine will only find the first
corresponding index location found and report that location for
both orphans.
inputs
base_df
first dataframe to compare
compare_df
second dataframe to compare
col
column label of the series to compare.
routine will compare the dataframe indexes with the
input of 'index'.
ignore_case
convert col to lowercase prior to comparison
print_output
print results instead of returning results
'''
col = col.lower()
base_df.columns = map(str.lower, base_df.columns)
compare_df.columns = map(str.lower, compare_df.columns)
if col == 'index':
base_series = base_df.index
compare_series = compare_df.index
else:
if (col not in base_df) or (col not in compare_df):
print(col + ' is not a column in both dataframes...')
return
else:
base_series = base_df[col]
compare_series = compare_df[col]
if ignore_case:
try:
base_series = base_series.str.lower()
compare_series = compare_series.str.lower()
base_df[col] = base_series
compare_df[col] = compare_series
except:
pass
base_orphans = list(base_series[~base_series.isin(compare_series)])
compare_orphans = list(compare_series[~compare_series.isin(base_series)])
base_col_name = 'base_orphans'
compare_col_name = 'compare_orphans'
base_loners = pd.DataFrame(base_orphans,
columns=[base_col_name])
compare_loners = pd.DataFrame(compare_orphans,
columns=[compare_col_name])
def find_label_locs(df, orphans):
loc_list = []
for orphan in orphans:
loc_list.append(df.index.get_loc(orphan))
return loc_list
def find_val_locs(df, orphans, col):
loc_list = []
for orphan in orphans:
if df[col].dtype == 'datetime64[ns]':
loc_list.append(list(df[col]).index(pd.to_datetime(orphan)))
else:
loc_list.append(list(df[col]).index(orphan))
return loc_list
if base_orphans:
if col == 'index':
base_loners['index_loc'] = find_label_locs(base_df, base_orphans)
else:
base_loners['index_loc'] = find_val_locs(base_df,
base_orphans, col)
if compare_orphans:
if col == 'index':
compare_loners['index_loc'] = find_label_locs(compare_df,
compare_orphans)
else:
compare_loners['index_loc'] = find_val_locs(compare_df,
compare_orphans,
col)
if print_output:
print('BASE:\n', base_loners, '\nCOMPARE:\n', compare_loners)
else:
return base_loners, compare_loners
[docs]def compare_dataframes(base, compare,
return_orphans=True,
ignore_case=True,
print_info=False,
convert_np_timestamps=True):
"""
Compare all common index and common column DataFrame values and
report if any value is not equal in a returned dataframe.
Values are compared only by index and column label, not order.
Therefore, the only values compared are within common index rows
and common columns. The routine will report the common columns and
any unique index rows when the print_info option is selected (True).
Inputs are pandas dataframes and/or pandas series.
This function works well when comparing initial data lists, such as
those which may be received from opposing parties.
If return_orphans, returns tuple (diffs, base_loners, compare_loners),
else returns diffs.
diffs is a differential dataframe.
inputs
base
baseline dataframe or series
compare
dataframe or series to compare against the baseline (base)
return_orphans
separately calculate and return the rows which are unique to
base and compare
ignore_case
convert the column labels and column data to be compared to
lowercase - this will avoid differences detected based on string
case
print_info
option to print out to console verbose statistical information
and the dataframe(s) instead of returning dataframe(s)
convert_np_timestamps
numpy returns datetime64 objects when the source is a datetime
date-only object.
this option will convert back to a date-only object for comparison.
"""
try:
assert ((isinstance(base, pd.DataFrame)) |
(isinstance(base, pd.Series))) and \
((isinstance(compare, pd.DataFrame)) |
(isinstance(compare, pd.Series)))
except AssertionError:
print('Routine aborted. Inputs must be a pandas dataframe or series.')
return
if isinstance(base, pd.Series):
base = pd.DataFrame(base)
if isinstance(compare, pd.Series):
compare = pd.DataFrame(compare)
common_rows = list(base.index[base.index.isin(compare.index)])
if print_info:
print('\nROW AND INDEX INFORMATION:\n')
print('base length:', len(base))
print('comp length:', len(compare))
print('common index count:', len(common_rows), '\n')
# orphans section---------------------------------------------------------
if return_orphans:
base_orphans = list(base.index[~base.index.isin(compare.index)])
compare_orphans = list(compare.index[~compare.index.isin(base.index)])
base_col_name = 'base_orphans'
compare_col_name = 'compare_orphans'
base_loners = pd.DataFrame(base_orphans,
columns=[base_col_name])
compare_loners = pd.DataFrame(compare_orphans,
columns=[compare_col_name])
def find_label_locs(df, orphans):
loc_list = []
for orphan in orphans:
loc_list.append(df.index.get_loc(orphan))
return loc_list
if base_orphans:
base_loners['index_loc'] = find_label_locs(base, base_orphans)
if print_info:
print('BASE LONERS (rows, by index):')
print(base_loners, '\n')
else:
if print_info:
print('''There are no unique index rows in the base input vs.
the compare input.\n''')
if compare_orphans:
compare_loners['index_loc'] = find_label_locs(compare,
compare_orphans)
if print_info:
print('COMPARE LONERS (rows, by index):')
print(compare_loners, '\n')
else:
if print_info:
print('''There are no unique index rows in the compare input
vs. the base input.\n''')
# -----------------------------------------------------------------------
base = base.loc[common_rows].copy()
compare = compare.loc[common_rows].copy()
unequal_cols = []
equal_cols = []
if ignore_case:
base.columns = map(str.lower, base.columns)
compare.columns = map(str.lower, compare.columns)
common_cols = list(base.columns[base.columns.isin(compare.columns)])
base_only_cols = list(base.columns[~base.columns.isin(compare.columns)])
comp_only_cols = list(compare.columns[~compare.columns.isin(base.columns)])
oddballs = base_only_cols.copy()
oddballs.extend(comp_only_cols)
all_columns = common_cols.copy()
all_columns.extend(oddballs)
if print_info:
same_col_list = []
print('\nCOMMON COLUMN equivalency:\n')
for col in common_cols:
if ignore_case:
try:
base[col] = base[col].str.lower()
compare[col] = compare[col].str.lower()
except:
pass
same_col = base[col].sort_index().equals(compare[col].sort_index())
if print_info:
same_col_list.append(same_col)
if not same_col:
unequal_cols.append(col)
else:
equal_cols.append(col)
base = base[unequal_cols]
compare = compare[unequal_cols]
if print_info:
same_col_df = pd.DataFrame(list(zip(common_cols, same_col_list)),
columns=['common_col', 'equivalent?'])
same_col_df.sort_values(['equivalent?', 'common_col'], inplace=True)
same_col_df.reset_index(drop=True, inplace=True)
print(same_col_df, '\n')
print('\nCOLUMN INFORMATION:')
print('\ncommon columns:\n', common_cols)
print('\ncommon and equal columns:\n', equal_cols)
print('\ncommon but unequal columns:\n', unequal_cols)
print('\ncols only in base:\n', base_only_cols)
print('\ncols only in compare:\n', comp_only_cols, '\n')
col_df = pd.DataFrame(index=[all_columns])
column_names = ['equal_cols', 'unequal_cols', 'common_cols',
'base_only_cols', 'comp_only_cols', 'all_columns']
for result_name in column_names:
i = 0
col_arr = np.empty_like(all_columns)
for name in all_columns:
if name in eval(result_name):
col_arr[i] = name
i += 1
col_df[result_name] = col_arr
col_df.sort_values(['unequal_cols', 'equal_cols'], inplace=True)
col_df.reset_index(drop=True, inplace=True)
col_df.rename(columns={'unequal_cols': 'not_equal',
'base_only_cols': 'base_only',
'comp_only_cols': 'comp_only'}, inplace=True)
print('\nCATEGORIZED COLUMN DATAFRAME:\n')
print(col_df, '\n')
zipped = []
col_counts = []
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=FutureWarning)
for col in base:
base_np = base[col].values
compare_np = compare[col].values
try:
unequal = np.not_equal(base_np, compare_np)
except:
try:
mask = base.duplicated(subset=col, keep=False)
dups = list(base[mask][col])
print('error, duplicate values:')
print(pd.DataFrame(dups, columns=['dups']))
except:
pass
row_ = np.where(unequal)[0]
index_ = base.iloc[row_].index
col_ = np.array([col] * row_.size)
base_ = base_np[unequal]
compare_ = compare_np[unequal]
if (base[col]).dtype == 'datetime64[ns]' and convert_np_timestamps:
try:
base_ = base_.astype('M8[D]')
compare_ = compare_.astype('M8[D]')
except:
pass
zipped.extend(list(zip(row_, index_, col_, base_, compare_)))
col_counts.append(row_.size)
diffs = pd.DataFrame(
zipped, columns=['row', 'index', 'column', 'base', 'compare'])
diffs.sort_values('row', inplace=True)
diffs.reset_index(drop=True, inplace=True)
if print_info:
print('\nDIFFERENTIAL DATAFRAME:\n')
print(diffs)
print('\nSUMMARY:\n')
print('''{!r} total differences found in
common rows and columns\n'''.format(len(zipped)))
if len(zipped) == 0:
print('''Comparison complete, dataframes are
equivalent. \nIndex and Column order may be different\n''')
else:
print('Breakdown by column:\n',
pd.DataFrame(list(zip(base.columns, col_counts)),
columns=['column', 'diff_count']),
'\n')
else:
if return_orphans:
return diffs, base_loners, compare_loners
else:
return diffs
# FIND LABEL LOCATIONS (index input)
[docs]def find_index_locs(df,
index_values):
'''Find the pandas dataframe index location of an array-like input
of index labels.
Returns a list containing the index location(s).
inputs
df
dataframe - the index_values input is a subset of the
dataframe index.
index_values
array-like collection of values which are a subset of the dataframe
index
'''
loc_list = []
for val in index_values:
loc_list.append(df.index.get_loc(val))
return loc_list
# FIND SERIES VALUE INDEX LOCATIONS
[docs]def find_series_locs(df,
series_values,
column_label):
'''Find the pandas dataframe index location of an array-like input
of series values.
Returns a list containing the index location(s).
inputs
df
dataframe - the series_values input is a subset of one of the
dataframe columns.
series_values
array-like collection of values which are a subset of one of
the dataframe columns (the column_lable input)
column_label
the series within the pandas dataframe containing the series_values
'''
loc_list = []
for val in series_values:
if df[column_label].dtype == 'datetime64[ns]':
loc_list.append(list(df[column_label]).index(pd.to_datetime(val)))
else:
loc_list.append(list(df[column_label]).index(val))
return loc_list
[docs]def test_df_col_or_idx_equivalence(df1,
df2,
col=None):
'''check whether two dataframes contain the same elements (but not
necessarily in the same order) in either the indexes or a selected column
inputs
df1, df2
the dataframes to check
col
if not None, test this dataframe column for equivalency, otherwise
test the dataframe indexes
Returns True or False
'''
if not col:
result = all(np.in1d(df1.index, df2.index,
assume_unique=True,
invert=False))
else:
result = all(np.in1d(df1[col], df2[col],
assume_unique=False,
invert=False))
return result