Prism.js

Raw Power - Data Processor Pt. 2

"Bombay"

 

TL;DR 

This blog post explains DataProcessor, a Python class designed for processing raw data: loading files, checking data types, dropping columns, identifying categorical data, and managing datetime indexes. It emphasizes flexibility, clean error handling, and modular design using libraries likpandas, numpy, and scikit-learn. The focus is on preparing data for analysis or visualization — powerful stuff for any data judoka! 

 

Introduction - Data Processor Pt. 2

From the previous post  — Class of the Process —  we introduce the idea of a class that could handle raw data and prepare it for further analysis.  This includes data ingestion, cleaning, transformation, and visualization support, making the data ready to use!

Originally, I intended to present the whole concept in one article, but despite my efforts, it became too large.  Both the article and the code!

You can find full code on my Github page.  Here, I'll focus on breaking down the main ideas.

 

Initialization & Data Loading

Before initializing the class, we need to import the required libraries.

Libraries Used

This project uses the following Python libraries:

  • pandas — for data manipulation and analysis
  • numpy — for numerical operations and array handling
  • pathlib — for easy and readable file system paths
  • datetime — for working with dates and times
  • scipy.stats.zscore — to detect outliers using Z-score method
  • scikit-learn(sklearn) — for feature scaling (StandardScaler, MinMaxScaler, RobustScaler)
  • matplotlib.pyplot — for creating visualizations
  • seaborn — for statistical plotting and heatmaps
With this clarified, let's look at the class's initialization and data loading methods.

1. __init__() Constructor 

The __init__ method iinitializes the class, either by loading data from a file (filepath) or by accepting a dataset directly (dataset).

  • Parameters:
    • (filepath) (optional): Path to a data file.
    • (dataset) (optional): A data structure compatible with pandas.DataFrame (e.g. a list of dictionaries or another structure)
  • Behavior
    • If a dataset is provided, it is immediately converted into a pandas.Dataframe, and the filepath is ignored. 
    • If no dataset is provided but a valid filepath is given, the object is prepared to later load data from that file (though the file loading itself is not shown in this snippet).
    • If neither a dataset nor a filepath is provided, the constructor raises a ValueError to prevent the creation of an uninitialized or meaningless object. 
Python Code 
class DataProcessor:
    def __init__(self, filepath=None, dataset=None):
        self.df = None
        self.filepath = filepath

        if dataset is not None:
            self.df = pd.DataFrame(dataset)
            self.filepath = None
        elif not filepath:
            raise ValueError("Either 'filepath' or 'dataset' must be provided.")

Why this matters:

  • This pattern allows flexibility — you can either pass raw data directly for quick testing or prototyping, or use a file for larger or more persistent datasets.

  • It enforces good practice by ensuring the object always has valid data to work with.

2. load() method

The load() method is designed to intelligently read different types of data files based on the file extension. It supports: 

  • .csv – Loaded using pandas.read_csv()
  • .xlsx or .xls – Loaded using pandas.read_excel()
  • .json – Loaded using pandas.read_json()
 If the file format isn't one of these, it raises a ValueError with a clear message.
  • How it works

  1. Checks the file extension using .endswith() to determine the appropriate pandas function to use.

  2. Reads the file into the self.df() attribute (a pandas.DataFrame).
  3. Prints a success message on successful load.

  4. Returns self to allow method chaining if needed (e.g., obj.load().process()).

  5. Handles errors gracefully: If anything goes wrong during file loading (missing file, corrupt data, unsupported format), it raises a RuntimeError with the original error message included.

Python Code 
def load(self):
        try:
            if self.filepath.endswith((".xlsx", ".xls")):
                self.df = pd.read_excel(self.filepath)
            elif self.filepath.endswith(".csv"):
                self.df = pd.read_csv(self.filepath)
            elif self.filepath.endswith(".json"):
                self.df = pd.read_json(self.filepath)
            else:
                raise ValueError(
                    "Unsupported file format. Supported: .csv, .xlsx, .xls, .json"
                )

            print("Data loaded successfully.")
            return self

        except Exception as e:
            raise RuntimeError(
                f"Your file wasn't properly loaded !! Reason: {str(e)}"
            )

Why this matters:

  • Prevents crashes from unsupported formats or bad data.
  • Gives users meaningful feedback.
  • Keeps your code clean and modular by separating loading logic from initialization.
 

Data Overview & Validation

After initialization and data loading, our dataset is already a data object.  From there, we can proceed with various scans and tests to inspect it further.  Methods such as check_dtypes(), drop_columns(),and  check_categorical_columns() help us understand not only the nature of the data but also allow us to retain only the information relevant to the project.

Python Code 
def check_dtypes(self):
        print("Data types:")
        print(self.df.dtypes)
        return self.df.dtypes

check_dtypes(): Simple and useful.  This method inspects the data types of each column in the DataFrame (self.df).  Understanding data types is crucial before performing operations such as filtering, analysis, or model training, as it ensures the data is processed correctly.

Python Code 
def drop_columns(self, columns_to_drop):
    if isinstance(columns_to_drop, str):
        columns_to_drop = [columns_to_drop]
    elif not isinstance(columns_to_drop, (list, tuple, set)):
        raise TypeError(
            "columns_to_drop must be a string or list/tuple/set of strings."
        )

    existing_cols = set(self.df.columns)
    to_drop = [col for col in columns_to_drop if col in existing_cols]
    not_found = [col for col in columns_to_drop if col not in existing_cols]

    if not to_drop:
        print("No matching columns found to drop.")
    else:
        self.df.drop(columns=to_drop, inplace=True)
        print(f"Dropped columns: {to_drop}")

    if not_found:
        print(
            f"These columns were not found in the DataFrame and were skipped: {not_found}"
        )

    return self

drop_columns(): Robust and user-friendly — this method handles various input types, checks for missing columns, and provides informative messages.  It prevents errors caused by attempting to drop non-existent columns while supporting flexible input formats such as strings, lists, sets, and tuples.  Additionally, the friendly messages enhance debugging and usability, and support for method chaining helps keep your data pipeline clean and efficient!

Python Code 

def check_categorical_columns(self):
    categorical_cols = self.df.select_dtypes(include=["object", "category"]).columns
    if categorical_cols.empty:
        print("No categorical columns found.")
    else:
        print(f"Categorical columns: {list(categorical_cols)}")
    return list(categorical_cols)

check_categorical_columns():  A clear and efficient method for identifying columns in your DataFrame that contain categorical data.  This is especially useful prior to tasks such as encoding, grouping, or analysis.

 

Index Handling

With methods like set_index_date() and check_index_is_datetime(), you can convert a specified column to date or datetime format and set it as the DataFrame’s index.  These methods ensure the index is valid, clean, and properly formatted — and also confirm that the index has been successfully created.  Using a datetime index is essential when working with time-based data, as it unlocks powerful features in pandas:

  • .resample() — Aggregate data by days, months, etc.
  • .loc[] with slices — Easily filter by date ranges df.loc['2025-09-30']   
  • Time-aware plotting — Clean, automatic x-axis formatting for dates
  • Time-based joins — Align time series from different sources
  • Handling gaps — Detect and fill missing time periods

Without a proper date or datetime index, these features break down and working with time-based data becomes significantly more complex. 

Python Code 
def set_index_date(
        self,
        index_column,
        log_invalid=False,
        log_path="invalid_datetime_rows.csv",
        check_index=True,
        force_plain_date=False,
    ):

        # 0. If already index, skip re-indexing
        if self.df.index.name == index_column:
            print(f"Column '{index_column}' is already set as index.")
            if check_index:
                self.check_index_is_datetime()
            return self

        # 1. Check if column exists
        if index_column not in self.df.columns:
            raise ValueError(f"Column '{index_column}' not found in the DataFrame.")

        # 2. Convert to datetime if not already
        if not pd.api.types.is_datetime64_any_dtype(self.df[index_column]):
            self.df[index_column] = pd.to_datetime(
                self.df[index_column], errors="coerce"
            )
            print(f"Column '{index_column}' converted to datetime.")

        # 3. Check for all NaT values
        if self.df[index_column].isna().all():
            print(f"All values in '{index_column}' are NaT. Cannot set as index.")
            return self

        # 4. Log invalid datetime values
        invalid_rows = self.df[self.df[index_column].isna()]
        num_invalid = len(invalid_rows)
        if num_invalid > 0:
            print(
                f"{num_invalid} rows in '{index_column}' could not be converted to datetime (set as NaT)."
            )
            if log_invalid:
                invalid_rows.to_csv(log_path, index=False)
                print(f"Invalid datetime rows saved to: {log_path}")

        # 5. Format date vs datetime
        non_na_values = self.df[index_column].dropna()
        sample_value = non_na_values.iloc[0]

        if isinstance(sample_value, date) and not isinstance(sample_value, datetime):
            print(f"Column '{index_column}' is already in date format (YYYY-MM-DD).")
        else:
            if force_plain_date:
                print(
                    f"force_plain_date=True — converting '{index_column}' to plain date."
                )
                self.df[index_column] = self.df[index_column].dt.date
            else:
                if hasattr(sample_value, "time") and sample_value.time() == time(0, 0):
                    print(
                        f"Column '{index_column}' appears to be date-only — converting to plain date."
                    )
                    self.df[index_column] = self.df[index_column].dt.date
                else:
                    print(
                        f"Column '{index_column}' contains time — normalizing to 00:00:00."
                    )
                    self.df[index_column] = self.df[index_column].dt.normalize()

        # 6. Warn about duplicate index values
        if self.df[index_column].duplicated().any():
            print(
                f"Warning: '{index_column}' contains duplicate values — consider handling them before indexing."
            )

        # 7. Set the index
        if index_column not in self.df.columns:
            print(f"Column '{index_column}' no longer exists. Cannot set as index.")
            return self

        self.df.set_index(index_column, inplace=True)
        print(f"Index set to column: '{index_column}'")

        # 8. Check if index is datetime or date
        if check_index:
            self.check_index_is_datetime()

        return self

 

Missing Values & Duplicates

The following methods,  check_missing(), handle_missing_values(), inspect_duplicates(), handle_duplicates(), log_duplicates()help automate and simplify essential data cleaning tasks such as detecting, handling, and logging missing or duplicate values in a DataFrame. For example:  

Python Code 
def inspect_duplicates(self, subset=None, keep=False, return_rows=False):
        dups = self.df[self.df.duplicated(subset=subset, keep=keep)]
        num_dups = len(dups)

        if num_dups > 0:
            print(f"Found {num_dups} duplicate row(s).")
        else:
            print("No duplicate rows found.")

        if return_rows:
            return num_dups, dups
        else:
            return num_dups

This method detects duplicate rows based on all columns or a specified subset.  It also reports how many duplicate rows exist and can optionally return the actual duplicate rows when return_rows=True).

Data quality issues are one of the biggest blockers in data science workflows. 

To keep this post concise and focused you can review the rest of these methods on my Github page.

 

Outlier Detection & Removal

Outliers can skew statistical analyses, machine learning models, and visualizations. These methods offer efficient tools for detecting and removing outliers from your pandas DataFrame using either Z-Score or Interquartile Range (IQR) technique. 

The check_outliers() method identifies potential outliers in numeric columns using the Z-Score method.  It calculates Z-scores for all numeric columns and flags values with an absolute Z-score  greater than  z_thresh (default=2 for this implementation).  The method prints the number of outliers per column  and returns a total count. 

At this point, a visual inspection of the data before and after the cleaning process can be a helpful way to assess how much data was removed. This can be done using box plot or histogram comparisons via the visualize_outliers_boxplot() and/or visualize_outliers_histogram() methods.

"Distribution of daily total sales before and after removing outliers using the Z-score method." 

 

NOTE: 

Should you remove records based on Z-Score Outliers in Total Sales?

In short, it depends on your goal. 

Z-score outlier detection assumes your data is approximately normally distributed. It flags points that lie beyond a threshold (e.g., ±3 standard deviations) from the mean.

However, it may not be advisable to apply this method to the total sales variable.

Sales data is usually right-skewed — a few days might have unusually high sales, which are not errors, but important real events (e.g., Black Friday, product launches). 

If your sales data isn’t approximately normal (and it often isn’t), Z-score thresholds aren't reliable. You might remove legitimate, meaningful data.

However, it might be acceptable to do so if:

  • You’re trying to fit a model that is highly sensitive to outliers
    (e.g., linear regression — outliers can drastically skew coefficients)

or

  • You have good reason to believe the outliers are due to errors or anomalies
    (e.g., a data collection glitch, duplicate entry, or a misrecorded transaction)

 

Why It's Useful

The check_outliers() method quickly identifies how many and which columns may contain outlier values — providing a fast and informative way to assess potential issues in your data!

If this method feels too restrictive, you can use remove_outliers_zscore(),  which removes entire rows where any numeric value exceeds the specified Z-score threshold.

Why that's Useful?

This approach is especially helpful for users who want to globally clean the dataset across all numerical columns — ensuring no extreme values are left that could negatively impact models or analyses.

 

An alternative:  IQR

remove_outliers_iqr()

This method is especially useful for non-Gaussian distributions (non-normal) where Z-score–based detection may not be effective.  One of its key features is the use of outlier bounds, which are defined using the Interquartile Range (IQR) multiplied by a user-defined factor (via iqr_multiplier(), default = 1.5).

This method is ideal for removing extreme values from features like income, age, or transaction amounts — common sources of skewed distributions in real-world datasets. 

  

Utility & Finishing Steps 

run_all_checks()  — This method runs a comprehensive suite of data quality checks, allowing you to quickly assess the overall health of your dataset. 

Python Code 
def run_all_checks(self):
        print("\n🔹 Running data quality checks...")
        self.check_missing()
        self.inspect_duplicates()
        self.check_dtypes()
        self.check_index_is_datetime()
        return self

 

Perfect for early-stage Exploratory Data Analysis (EDA), especially when working with unfamiliar or messy datasets. 

Depending on the user's needs, the order of the methods can be re-arranged as needed.

save()  —  Last but not least, this flexible data export method allows you to save your cleaned or processed DataFrame to a file after all transformations are complete. It supports multiple formats, including CSV and Excel (XLSX), making it easy to integrate into various workflows.

It’s useful at the end of your pipeline to persist results for sharing, reporting, or further analysis — ensuring your cleaned data is ready for the next stage.

 

Final thoughts

In workflows with tight schedules and demanding deadlines, having an organized mindset is a valuable asset.  Building a pipeline like this was not only creative but also a genuinely enjoyable experience for me.

Of course, this class can be further enhanced and extended — but for now, this is pretty much ...
  The DataProcessor
 !

As mentioned earlier, the full code and detailed documentation for the methods are available in my GitHub project DS/ML.

And since you've read so far, I hope you found this post interesting!

 

Coming Up Next 

In the next one, we'll see how to put it into action with real examples.

Almost real  !

 

 

 

 

Σχόλια