Skip to content

Data

rankmc.data.etl

General ETL process to move from interm to processed file add data to deployed stage

backup_file(path_csv_deployed, dst)

copies file for archives

Parameters:

Name Type Description Default
path_csv_deployed str

path of file to back up

required
dst str

path destination of file to save to

required
Source code in src/rankmc/data/etl.py
40
41
42
43
44
45
46
47
48
49
def backup_file(path_csv_deployed: str, dst: str) -> None:
    """copies file for archives

    Args:
        path_csv_deployed (str): path of file to back up
        dst (str): path destination of file to save to
    """
    import shutil

    shutil.copy(path_csv_deployed, dst)

csv_combine_proc(paths)

combines all datasets from the interim stage

Parameters:

Name Type Description Default
paths list

paths from interim datasets

required

Returns:

Type Description
DataFrame

pd.DataFrame: combined dataframe

Source code in src/rankmc/data/etl.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def csv_combine_proc(paths: list) -> pd.DataFrame:
    """combines all datasets from the interim stage

    Args:
        paths (list): paths from interim datasets

    Returns:
        pd.DataFrame: combined dataframe
    """
    import datetime

    import pandas as pd

    df = pd.DataFrame()
    for file in paths:
        filename = file.split("\\")[8].split(".")[0]
        print("Folder - " + filename)

        try:
            df_temp = pd.read_csv(file)
            df_temp["Source.Name.Interim"] = filename

            now = datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d")
            # date ran
            df_temp["proccessed"] = now
            df = pd.concat([df, df_temp], axis=0)

        except pd.errors.EmptyDataError:
            print("Folder " + filename + " is blank. Skipping file.")
    return df

csv_combine_update_dep(paths, path_csv_deployed, ref_col)

combines datasets from deployed and processed stage removing duplicated files from deployed stage if processed file has same file name (considers for updated data in new files). CONFIRM file names are the SAME if not it will duplicate data.

Parameters:

Name Type Description Default
paths list

paths from processed datasets

required
path_csv_deployed str

path of deployed dataset

required
ref_col str

reference column to avoid duplicated dated

required

Returns:

Type Description
DataFrame

pd.DataFrame: combined dataset from processed and existing deployed

Source code in src/rankmc/data/etl.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def csv_combine_update_dep(paths: list, path_csv_deployed: str, ref_col: str) -> pd.DataFrame:
    """combines datasets from deployed and processed stage removing
        duplicated files from deployed stage if processed file
        has same file name (considers for updated data in new files).
        CONFIRM file names are the SAME if not it will
        duplicate data.

    Args:
        paths (list): paths from processed datasets
        path_csv_deployed (str): path of deployed dataset
        ref_col (str): reference column to avoid duplicated dated

    Returns:
        pd.DataFrame: combined dataset from processed and existing deployed
    """
    import datetime

    import pandas as pd

    df_deployed = pd.read_csv(path_csv_deployed)

    for file in paths:
        filename = file.split("\\")[8]
        print(filename)

        df_temp = pd.read_csv(file)

        # date ran
        now = datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d")
        df_temp["deployed"] = now

        # v2
        # removes files with the same file path in deployed
        # if it reuploads it keeps one file (help with updates and duplicated files)
        filenames = df_deployed[ref_col]

        # unique set of deployed file names
        filenames = set(filenames)

        filenames_temp = df_temp[ref_col]

        # unique set of processed file names
        filenames_temp = set(filenames_temp)
        # find matching names
        updated = filenames.intersection(filenames_temp)
        print("Updating ...")
        print(updated)
        # remove matching file names based on the ref_col
        df_deployed = df_deployed.loc[~df_deployed[ref_col].isin(updated)]

        # combine datasets
        df_deployed = pd.concat([df_deployed, df_temp], axis=0)

    return df_deployed

csv_dep_init(paths)

Initilizes dataset to next stage to deployment from proccessed

Parameters:

Name Type Description Default
paths list

paths from processed datasets

required

Returns:

Type Description
DataFrame

pd.DataFrame: dataset from proccessed initialized

Source code in src/rankmc/data/etl.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def csv_dep_init(paths: list) -> pd.DataFrame:
    """Initilizes dataset to next stage to deployment from proccessed

    Args:
        paths (list): paths from processed datasets

    Returns:
        pd.DataFrame: dataset from proccessed initialized
    """
    import datetime

    import pandas as pd

    for file in paths:
        filename = file.split("\\")[8]
        print(filename)

        df_temp = pd.read_csv(file)

        # date ran
        now = datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d")
        df_temp["deployed"] = now

    return df_temp

datafile_path_finder(file_name)

Constructs a path by combining the parent directory of the current working directory with the 'data' folder and the provided file name. If no file name is provided, a default path or an empty string can be returned.

Parameters:

Name Type Description Default
file_name str

The name of the file for which the path is to be determined.

required

Returns:

Name Type Description
df_dir str

The full path to the file, or an indication if no file name was provided.

Source code in src/rankmc/data/etl.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def datafile_path_finder(file_name: str) -> str:
    """
    Constructs a path by combining the parent directory of the current working directory with the 'data' folder
    and the provided file name. If no file name is provided, a default path or an empty string can be returned.

    Args:
        file_name (str, optional): The name of the file for which the path is to be determined.

    Returns:
        df_dir (str): The full path to the file, or an indication if no file name was provided.
    """
    import glob
    import os

    main_dir = os.path.dirname(os.getcwd())
    rawdata_dir = os.path.join(main_dir, "data", file_name)
    df_dir = glob.glob(rawdata_dir)[0]
    return df_dir

stratified_sample(df, col, n_samples)

Sample a DataFrame by a column, stratified by the column.

Parameters:

Name Type Description Default
df dataframe

dataframe to sample

required
col str

column to stratify by

required
n_samples int

number of samples to take

required

Returns:

Name Type Description
df dataframe

sampled dataframe

Source code in src/rankmc/data/etl.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def stratified_sample(df: pd.DataFrame, col: str, n_samples: int) -> pd.DataFrame:
    """Sample a DataFrame by a column, stratified by the column.

    Args:
        df (dataframe): dataframe to sample
        col (str): column to stratify by
        n_samples (int): number of samples to take

    Returns:
        df (dataframe): sampled dataframe
    """
    import numpy as np

    np.random.seed(42)
    sampled_orders_ids = np.random.choice(df[col].unique(), size=n_samples, replace=False)
    df = df[df[col].isin(sampled_orders_ids)]
    return df