Skip to content

Data

data.etl

General ETL process to move from interm to processed file add data to deployed stage

TextCleaner

Clean text data by removing stopwords, punctuation, new spaces / tabs and converting to lowercase.

Returns:

Name Type Description
str

cleaned text

Source code in src/data/etl.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
class TextCleaner:
    """Clean text data by removing stopwords, punctuation, new spaces / tabs and converting to lowercase.

    Returns:
        str: cleaned text
    """

    import string
    from typing import List  # noqa: UP035

    def __init__(self, additional_stopwords: List[str] = None):  # noqa: RUF013, UP006
        from nltk.corpus import stopwords

        """Initialize the TextCleaner with optional additional stopwords.
        """
        # Default stopwords from NLTK
        self.stopwords = set(stopwords.words("english"))
        if additional_stopwords:
            self.stopwords.update(word.lower() for word in additional_stopwords)

    def remove_stopwords(self, text: str) -> str:
        """removes stopwords from a string

        Args:
            text (str): text with stopwords

        Returns:
            str: text without stopwords
        """
        words = text.split()
        filtered_words = [word for word in words if word.lower() not in self.stopwords]
        return " ".join(filtered_words)

    def remove_punctuation(self, text: str, punct: str = string.punctuation) -> str:
        """Remove punctuation from the text.

        Args:
            text (str): text with punctuation
            punct (str, optional): Punctuation to remove. Defaults to string.punctuation.

        Returns:
            str: text without punctuation
        """
        return "".join(char for char in text if char not in punct)

    def unicode(self, text: str) -> str:
        import unidecode

        """converts unicode characters to ASCII

        Returns:
            _type_: converted text
        """
        return unidecode.unidecode(text)

    def remove_newline_tabs_spaces(self, text: str) -> str:
        """Removes newlines and tabs from a string and replaces them with spaces

        Args:
            text (str): text with newlines and tabs

        Returns:
            str: cleaned text
        """
        # Replace newlines and tabs with spaces
        text = re.sub(r"[\n\t]+", " ", text)
        # Optionally remove extra spaces
        text = re.sub(r"\s+", " ", text).strip()
        return text

    def clean(self, text: str) -> str:
        """Apply all cleaning steps to the text.

        Args:
            text (str): unpocessed text

        Returns:
            str: processed text
        """
        text = self.unicode(text)
        text = self.remove_punctuation(text)
        text = self.remove_stopwords(text)
        text = self.remove_newline_tabs_spaces(text)
        return text.lower()

clean(text)

Apply all cleaning steps to the text.

Parameters:

Name Type Description Default
text str

unpocessed text

required

Returns:

Name Type Description
str str

processed text

Source code in src/data/etl.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def clean(self, text: str) -> str:
    """Apply all cleaning steps to the text.

    Args:
        text (str): unpocessed text

    Returns:
        str: processed text
    """
    text = self.unicode(text)
    text = self.remove_punctuation(text)
    text = self.remove_stopwords(text)
    text = self.remove_newline_tabs_spaces(text)
    return text.lower()

remove_newline_tabs_spaces(text)

Removes newlines and tabs from a string and replaces them with spaces

Parameters:

Name Type Description Default
text str

text with newlines and tabs

required

Returns:

Name Type Description
str str

cleaned text

Source code in src/data/etl.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def remove_newline_tabs_spaces(self, text: str) -> str:
    """Removes newlines and tabs from a string and replaces them with spaces

    Args:
        text (str): text with newlines and tabs

    Returns:
        str: cleaned text
    """
    # Replace newlines and tabs with spaces
    text = re.sub(r"[\n\t]+", " ", text)
    # Optionally remove extra spaces
    text = re.sub(r"\s+", " ", text).strip()
    return text

remove_punctuation(text, punct=string.punctuation)

Remove punctuation from the text.

Parameters:

Name Type Description Default
text str

text with punctuation

required
punct str

Punctuation to remove. Defaults to string.punctuation.

punctuation

Returns:

Name Type Description
str str

text without punctuation

Source code in src/data/etl.py
233
234
235
236
237
238
239
240
241
242
243
def remove_punctuation(self, text: str, punct: str = string.punctuation) -> str:
    """Remove punctuation from the text.

    Args:
        text (str): text with punctuation
        punct (str, optional): Punctuation to remove. Defaults to string.punctuation.

    Returns:
        str: text without punctuation
    """
    return "".join(char for char in text if char not in punct)

remove_stopwords(text)

removes stopwords from a string

Parameters:

Name Type Description Default
text str

text with stopwords

required

Returns:

Name Type Description
str str

text without stopwords

Source code in src/data/etl.py
220
221
222
223
224
225
226
227
228
229
230
231
def remove_stopwords(self, text: str) -> str:
    """removes stopwords from a string

    Args:
        text (str): text with stopwords

    Returns:
        str: text without stopwords
    """
    words = text.split()
    filtered_words = [word for word in words if word.lower() not in self.stopwords]
    return " ".join(filtered_words)

apply_function_to_non_integer_columns(df, func)

Applies the given function to each column in the DataFrame that is object type dtype. Used for cleaning up text data in the DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to process.

required
func callable

The function to apply to each non-integer column.

required

Returns:

Type Description
DataFrame

pd.DataFrame: The DataFrame with non-integer columns processed by the given function.

Source code in src/data/etl.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def apply_function_to_non_integer_columns(df: pd.DataFrame, func) -> pd.DataFrame:
    """
    Applies the given function to each column in the DataFrame that is object type dtype.
    Used for cleaning up text data in the DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame to process.
        func (callable): The function to apply to each non-integer column.

    Returns:
        pd.DataFrame: The DataFrame with non-integer columns processed by the given function.
    """
    for col in df.columns:
        if df[col].dtype == "object":  # Check if column contains non-integer data
            print(f"Processing column: {col}")
            df[col] = df[col].apply(func)
    return df

backup_file(path_csv_deployed, dst)

copies file for archives

Parameters:

Name Type Description Default
path_csv_deployed str

path of file to back up

required
dst str

path destination of file to save to

required
Source code in src/data/etl.py
42
43
44
45
46
47
48
49
50
51
def backup_file(path_csv_deployed: str, dst: str) -> None:
    """copies file for archives

    Args:
        path_csv_deployed (str): path of file to back up
        dst (str): path destination of file to save to
    """
    import shutil

    shutil.copy(path_csv_deployed, dst)

csv_combine_proc(paths)

combines all datasets from the interim stage

Parameters:

Name Type Description Default
paths list

paths from interim datasets

required

Returns:

Type Description
DataFrame

pd.DataFrame: combined dataframe

Source code in src/data/etl.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def csv_combine_proc(paths: list) -> pd.DataFrame:
    """combines all datasets from the interim stage

    Args:
        paths (list): paths from interim datasets

    Returns:
        pd.DataFrame: combined dataframe
    """
    import datetime

    import pandas as pd

    df = pd.DataFrame()
    for file in paths:
        filename = file.split("\\")[8].split(".")[0]
        print("Folder - " + filename)

        try:
            df_temp = pd.read_csv(file)
            df_temp["Source.Name.Interim"] = filename

            now = datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d")
            # date ran
            df_temp["proccessed"] = now
            df = pd.concat([df, df_temp], axis=0)

        except pd.errors.EmptyDataError:
            print("Folder " + filename + " is blank. Skipping file.")
    return df

csv_combine_update_dep(paths, path_csv_deployed, ref_col)

combines datasets from deployed and processed stage removing duplicated files from deployed stage if processed file has same file name (considers for updated data in new files). CONFIRM file names are the SAME if not it will duplicate data.

Parameters:

Name Type Description Default
paths list

paths from processed datasets

required
path_csv_deployed str

path of deployed dataset

required
ref_col str

reference column to avoid duplicated dated

required

Returns:

Type Description
DataFrame

pd.DataFrame: combined dataset from processed and existing deployed

Source code in src/data/etl.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def csv_combine_update_dep(paths: list, path_csv_deployed: str, ref_col: str) -> pd.DataFrame:
    """combines datasets from deployed and processed stage removing
        duplicated files from deployed stage if processed file
        has same file name (considers for updated data in new files).
        CONFIRM file names are the SAME if not it will
        duplicate data.

    Args:
        paths (list): paths from processed datasets
        path_csv_deployed (str): path of deployed dataset
        ref_col (str): reference column to avoid duplicated dated

    Returns:
        pd.DataFrame: combined dataset from processed and existing deployed
    """
    import datetime

    import pandas as pd

    df_deployed = pd.read_csv(path_csv_deployed)

    for file in paths:
        filename = file.split("\\")[8]
        print(filename)

        df_temp = pd.read_csv(file)

        # date ran
        now = datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d")
        df_temp["deployed"] = now

        # v2
        # removes files with the same file path in deployed
        # if it reuploads it keeps one file (help with updates and duplicated files)
        filenames = df_deployed[ref_col]

        # unique set of deployed file names
        filenames = set(filenames)

        filenames_temp = df_temp[ref_col]

        # unique set of processed file names
        filenames_temp = set(filenames_temp)
        # find matching names
        updated = filenames.intersection(filenames_temp)
        print("Updating ...")
        print(updated)
        # remove matching file names based on the ref_col
        df_deployed = df_deployed.loc[~df_deployed[ref_col].isin(updated)]

        # combine datasets
        df_deployed = pd.concat([df_deployed, df_temp], axis=0)

    return df_deployed

csv_dep_init(paths)

Initilizes dataset to next stage to deployment from proccessed

Parameters:

Name Type Description Default
paths list

paths from processed datasets

required

Returns:

Type Description
DataFrame

pd.DataFrame: dataset from proccessed initialized

Source code in src/data/etl.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def csv_dep_init(paths: list) -> pd.DataFrame:
    """Initilizes dataset to next stage to deployment from proccessed

    Args:
        paths (list): paths from processed datasets

    Returns:
        pd.DataFrame: dataset from proccessed initialized
    """
    import datetime

    import pandas as pd

    for file in paths:
        filename = file.split("\\")[8]
        print(filename)

        df_temp = pd.read_csv(file)

        # date ran
        now = datetime.datetime.now(tz=datetime.timezone.utc).strftime("%Y-%m-%d")
        df_temp["deployed"] = now

    return df_temp

datafile_path_finder(file_name)

Constructs a path by combining the parent directory of the current working directory with the 'data' folder and the provided file name. If no file name is provided, a default path is returned.

Parameters:

Name Type Description Default
file_name str

The name of the file for which the path is to be determined.

required

Returns:

Name Type Description
df_dir str

The full path to the file, or an indication if no file name was provided.

Source code in src/data/etl.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def datafile_path_finder(file_name: str) -> str:
    """
    Constructs a path by combining the parent directory of the current working directory with the 'data' folder
    and the provided file name. If no file name is provided, a default path is returned.

    Args:
        file_name (str): The name of the file for which the path is to be determined.

    Returns:
        df_dir (str): The full path to the file, or an indication if no file name was provided.
    """
    import glob
    import os

    main_dir = os.path.dirname(os.getcwd())
    rawdata_dir = os.path.join(main_dir, "data", file_name)
    df_dir = glob.glob(rawdata_dir)[0]
    return df_dir

find_nan(df)

finds all NaN values in a dataframe

Parameters:

Name Type Description Default
df DataFrame

dataframe to search for NaN values

required

Returns:

Type Description
DataFrame

pd.DataFrame: count of NaN values in each column

Source code in src/data/etl.py
156
157
158
159
160
161
162
163
164
165
166
def find_nan(df: pd.DataFrame) -> pd.DataFrame:
    """finds all NaN values in a dataframe

    Args:
        df (pd.DataFrame): dataframe to search for NaN values

    Returns:
        pd.DataFrame: count of NaN values in each column
    """

    return df.isnull().sum()