Chapter 28: Deep Learning for Baseball

Deep learning has revolutionized sports analytics, and baseball is no exception. In this chapter, we explore how neural networks can be applied to various baseball prediction tasks, from pitch outcome forecasting to computer vision applications in swing analysis. We'll implement practical models using modern deep learning frameworks and demonstrate their application to real-world baseball scenarios.

Advanced ~35 min read 8 sections 17 code examples
Book Progress
54%
Chapter 29 of 54
What You'll Learn
  • Introduction to Neural Networks for Baseball
  • Pitch Outcome Prediction with Deep Learning
  • Player Performance Forecasting with LSTMs
  • Computer Vision for Baseball
  • And 4 more topics...
Languages in This Chapter
Python (17)

All code examples can be copied and run in your environment.

28.1 Introduction to Neural Networks for Baseball

Understanding Deep Learning in Baseball Context

Neural networks excel at finding complex, non-linear patterns in high-dimensional data—exactly what baseball analytics demands. Traditional statistical models often assume linear relationships or require manual feature engineering, but neural networks can automatically learn relevant features from raw data.

Key Applications in Baseball:


  • Pitch outcome prediction

  • Player performance forecasting

  • Trajectory modeling

  • Computer vision for swing and pitch analysis

  • Injury risk assessment

  • Game state evaluation

Neural Network Fundamentals

A neural network consists of layers of interconnected nodes (neurons) that transform input data through learned weights and biases. For baseball applications, we typically use:

  1. Feedforward Networks: For tabular baseball statistics
  2. Recurrent Networks (RNNs/LSTMs): For sequential data like pitch sequences
  3. Convolutional Networks (CNNs): For image/video analysis
  4. Transformer Networks: For attention-based modeling of complex interactions

Basic Neural Network Architecture

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# Simple feedforward network for baseball statistics
class BaseballNN(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(BaseballNN, self).__init__()
        self.layers = nn.ModuleList()

        # Input layer
        self.layers.append(nn.Linear(input_size, hidden_sizes[0]))

        # Hidden layers
        for i in range(len(hidden_sizes) - 1):
            self.layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))

        # Output layer
        self.layers.append(nn.Linear(hidden_sizes[-1], output_size))

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        for i, layer in enumerate(self.layers[:-1]):
            x = self.relu(layer(x))
            x = self.dropout(x)
        x = self.layers[-1](x)
        return x

# Example: Predicting batter performance
def create_batter_features():
    """Create synthetic batter statistics for demonstration"""
    np.random.seed(42)
    n_samples = 1000

    data = {
        'exit_velocity': np.random.normal(88, 8, n_samples),
        'launch_angle': np.random.normal(15, 10, n_samples),
        'sprint_speed': np.random.normal(27, 2, n_samples),
        'barrel_rate': np.random.uniform(0, 20, n_samples),
        'hard_hit_rate': np.random.uniform(30, 50, n_samples),
        'k_rate': np.random.uniform(15, 35, n_samples),
        'bb_rate': np.random.uniform(5, 15, n_samples),
        'pull_rate': np.random.uniform(35, 50, n_samples),
        'groundball_rate': np.random.uniform(35, 55, n_samples),
    }

    df = pd.DataFrame(data)

    # Target: wOBA (weighted on-base average)
    # Simplified calculation based on features
    df['wOBA'] = (0.003 * df['exit_velocity'] +
                  0.002 * df['launch_angle'] +
                  0.005 * df['barrel_rate'] +
                  0.003 * df['hard_hit_rate'] -
                  0.002 * df['k_rate'] +
                  0.004 * df['bb_rate'] +
                  np.random.normal(0, 0.02, n_samples))

    df['wOBA'] = df['wOBA'].clip(0.250, 0.450)

    return df

# Train a simple model
def train_batter_model():
    df = create_batter_features()

    # Prepare data
    feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
                    'barrel_rate', 'hard_hit_rate', 'k_rate',
                    'bb_rate', 'pull_rate', 'groundball_rate']

    X = df[feature_cols].values
    y = df['wOBA'].values

    # Split and scale
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1)
    X_test_tensor = torch.FloatTensor(X_test)
    y_test_tensor = torch.FloatTensor(y_test).reshape(-1, 1)

    # Create model
    model = BaseballNN(input_size=9, hidden_sizes=[64, 32, 16], output_size=1)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    epochs = 100
    losses = []

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        predictions = model(X_train_tensor)
        loss = criterion(predictions, y_train_tensor)

        # Backward pass
        loss.backward()
        optimizer.step()

        losses.append(loss.item())

        if (epoch + 1) % 20 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

    # Evaluation
    model.eval()
    with torch.no_grad():
        test_predictions = model(X_test_tensor)
        test_loss = criterion(test_predictions, y_test_tensor)
        print(f"\nTest Loss (MSE): {test_loss.item():.4f}")

    return model, scaler, losses

print("Training basic neural network for batter performance prediction...")
model, scaler, losses = train_batter_model()
Python
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# Simple feedforward network for baseball statistics
class BaseballNN(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(BaseballNN, self).__init__()
        self.layers = nn.ModuleList()

        # Input layer
        self.layers.append(nn.Linear(input_size, hidden_sizes[0]))

        # Hidden layers
        for i in range(len(hidden_sizes) - 1):
            self.layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))

        # Output layer
        self.layers.append(nn.Linear(hidden_sizes[-1], output_size))

        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        for i, layer in enumerate(self.layers[:-1]):
            x = self.relu(layer(x))
            x = self.dropout(x)
        x = self.layers[-1](x)
        return x

# Example: Predicting batter performance
def create_batter_features():
    """Create synthetic batter statistics for demonstration"""
    np.random.seed(42)
    n_samples = 1000

    data = {
        'exit_velocity': np.random.normal(88, 8, n_samples),
        'launch_angle': np.random.normal(15, 10, n_samples),
        'sprint_speed': np.random.normal(27, 2, n_samples),
        'barrel_rate': np.random.uniform(0, 20, n_samples),
        'hard_hit_rate': np.random.uniform(30, 50, n_samples),
        'k_rate': np.random.uniform(15, 35, n_samples),
        'bb_rate': np.random.uniform(5, 15, n_samples),
        'pull_rate': np.random.uniform(35, 50, n_samples),
        'groundball_rate': np.random.uniform(35, 55, n_samples),
    }

    df = pd.DataFrame(data)

    # Target: wOBA (weighted on-base average)
    # Simplified calculation based on features
    df['wOBA'] = (0.003 * df['exit_velocity'] +
                  0.002 * df['launch_angle'] +
                  0.005 * df['barrel_rate'] +
                  0.003 * df['hard_hit_rate'] -
                  0.002 * df['k_rate'] +
                  0.004 * df['bb_rate'] +
                  np.random.normal(0, 0.02, n_samples))

    df['wOBA'] = df['wOBA'].clip(0.250, 0.450)

    return df

# Train a simple model
def train_batter_model():
    df = create_batter_features()

    # Prepare data
    feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
                    'barrel_rate', 'hard_hit_rate', 'k_rate',
                    'bb_rate', 'pull_rate', 'groundball_rate']

    X = df[feature_cols].values
    y = df['wOBA'].values

    # Split and scale
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1)
    X_test_tensor = torch.FloatTensor(X_test)
    y_test_tensor = torch.FloatTensor(y_test).reshape(-1, 1)

    # Create model
    model = BaseballNN(input_size=9, hidden_sizes=[64, 32, 16], output_size=1)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    epochs = 100
    losses = []

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        predictions = model(X_train_tensor)
        loss = criterion(predictions, y_train_tensor)

        # Backward pass
        loss.backward()
        optimizer.step()

        losses.append(loss.item())

        if (epoch + 1) % 20 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

    # Evaluation
    model.eval()
    with torch.no_grad():
        test_predictions = model(X_test_tensor)
        test_loss = criterion(test_predictions, y_test_tensor)
        print(f"\nTest Loss (MSE): {test_loss.item():.4f}")

    return model, scaler, losses

print("Training basic neural network for batter performance prediction...")
model, scaler, losses = train_batter_model()

28.2 Pitch Outcome Prediction with Deep Learning

Multi-Class Classification for Pitch Results

Predicting pitch outcomes (ball, called strike, swinging strike, foul, hit into play) is a fundamental deep learning task in baseball. We'll build a model that considers pitch characteristics, game context, and batter-pitcher matchup data.

# Pitch outcome prediction with TensorFlow/Keras
class PitchOutcomePredictor:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.outcome_encoder = {
            'ball': 0,
            'called_strike': 1,
            'swinging_strike': 2,
            'foul': 3,
            'in_play': 4
        }

    def create_pitch_data(self, n_samples=5000):
        """Generate synthetic pitch data"""
        np.random.seed(42)

        data = {
            # Pitch characteristics
            'release_speed': np.random.normal(92, 5, n_samples),
            'release_spin_rate': np.random.normal(2300, 300, n_samples),
            'pfx_x': np.random.normal(0, 8, n_samples),  # horizontal movement
            'pfx_z': np.random.normal(0, 10, n_samples),  # vertical movement
            'plate_x': np.random.uniform(-2, 2, n_samples),  # horizontal location
            'plate_z': np.random.uniform(1, 4, n_samples),  # vertical location
            'release_extension': np.random.normal(6.2, 0.3, n_samples),

            # Count
            'balls': np.random.randint(0, 4, n_samples),
            'strikes': np.random.randint(0, 3, n_samples),

            # Matchup
            'pitcher_handedness': np.random.choice([0, 1], n_samples),  # 0=L, 1=R
            'batter_handedness': np.random.choice([0, 1], n_samples),
            'pitch_number': np.random.randint(1, 12, n_samples),

            # Pitch type (encoded)
            'pitch_type': np.random.choice([0, 1, 2, 3, 4], n_samples),  # FF, SL, CH, CU, SI
        }

        df = pd.DataFrame(data)

        # Generate outcomes based on features (simplified logic)
        outcomes = []
        for idx, row in df.iterrows():
            # Distance from strike zone center
            distance_from_center = np.sqrt(row['plate_x']**2 + (row['plate_z'] - 2.5)**2)

            # Probabilities based on location and movement
            if distance_from_center > 1.2:  # Outside zone
                probs = [0.6, 0.15, 0.1, 0.1, 0.05]  # Likely ball
            elif distance_from_center < 0.3:  # Middle-middle
                probs = [0.05, 0.1, 0.15, 0.25, 0.45]  # Likely in play
            else:  # Edge of zone
                probs = [0.2, 0.25, 0.2, 0.25, 0.1]  # Mixed

            outcome = np.random.choice(list(self.outcome_encoder.values()), p=probs)
            outcomes.append(outcome)

        df['outcome'] = outcomes

        return df

    def build_model(self, input_dim, num_classes=5):
        """Build deep neural network for pitch outcome prediction"""
        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),
            layers.Dense(128, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(64, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(32, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.2),

            layers.Dense(16, activation='relu'),

            layers.Dense(num_classes, activation='softmax')
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    def train(self, epochs=50, batch_size=32):
        """Train the pitch outcome model"""
        # Generate data
        df = self.create_pitch_data(n_samples=10000)

        # Prepare features and target
        feature_cols = ['release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
                       'plate_x', 'plate_z', 'release_extension', 'balls',
                       'strikes', 'pitcher_handedness', 'batter_handedness',
                       'pitch_number', 'pitch_type']

        X = df[feature_cols].values
        y = df['outcome'].values

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Scale features
        X_train = self.scaler.fit_transform(X_train)
        X_test = self.scaler.transform(X_test)

        # Build model
        self.model = self.build_model(input_dim=X_train.shape[1])

        # Callbacks
        early_stopping = keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        )

        reduce_lr = keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=0.00001
        )

        # Train
        history = self.model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[early_stopping, reduce_lr],
            verbose=1
        )

        # Evaluate
        test_loss, test_accuracy = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest Accuracy: {test_accuracy:.4f}")
        print(f"Test Loss: {test_loss:.4f}")

        return history

    def predict_outcome(self, pitch_features):
        """Predict outcome for new pitch"""
        scaled_features = self.scaler.transform([pitch_features])
        probabilities = self.model.predict(scaled_features, verbose=0)[0]

        outcome_names = ['Ball', 'Called Strike', 'Swinging Strike', 'Foul', 'In Play']
        prediction = {name: prob for name, prob in zip(outcome_names, probabilities)}

        return prediction

# Train the model
print("\n" + "="*60)
print("Training Pitch Outcome Prediction Model")
print("="*60)
predictor = PitchOutcomePredictor()
history = predictor.train(epochs=30)

# Example prediction
sample_pitch = [94.5, 2400, -5, 12, 0.2, 2.8, 6.3, 2, 1, 1, 0, 5, 0]
prediction = predictor.predict_outcome(sample_pitch)
print("\nSample Pitch Prediction:")
for outcome, prob in prediction.items():
    print(f"{outcome}: {prob:.3f}")

Advanced Architecture: Embedding Layers for Categorical Variables

For pitch type classification, we can use embedding layers to learn representations of categorical variables like pitcher ID, batter ID, and pitch type.

def create_pitch_classifier_with_embeddings():
    """Advanced pitch outcome model with embeddings"""

    # Input layers for different feature types
    # Continuous features
    continuous_input = layers.Input(shape=(10,), name='continuous_features')

    # Categorical features (pitcher, batter, etc.)
    pitcher_input = layers.Input(shape=(1,), name='pitcher_id')
    batter_input = layers.Input(shape=(1,), name='batter_id')
    pitch_type_input = layers.Input(shape=(1,), name='pitch_type')

    # Embedding layers
    pitcher_embedding = layers.Embedding(
        input_dim=500,  # number of pitchers
        output_dim=16,
        name='pitcher_embedding'
    )(pitcher_input)
    pitcher_embedding = layers.Flatten()(pitcher_embedding)

    batter_embedding = layers.Embedding(
        input_dim=500,  # number of batters
        output_dim=16,
        name='batter_embedding'
    )(batter_input)
    batter_embedding = layers.Flatten()(batter_embedding)

    pitch_type_embedding = layers.Embedding(
        input_dim=10,  # number of pitch types
        output_dim=8,
        name='pitch_type_embedding'
    )(pitch_type_input)
    pitch_type_embedding = layers.Flatten()(pitch_type_embedding)

    # Concatenate all features
    concatenated = layers.Concatenate()([
        continuous_input,
        pitcher_embedding,
        batter_embedding,
        pitch_type_embedding
    ])

    # Deep network
    x = layers.Dense(128, activation='relu')(concatenated)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Dense(64, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Dense(32, activation='relu')(x)
    x = layers.Dropout(0.2)(x)

    # Output layer
    output = layers.Dense(5, activation='softmax', name='outcome')(x)

    # Create model
    model = keras.Model(
        inputs=[continuous_input, pitcher_input, batter_input, pitch_type_input],
        outputs=output
    )

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

print("\nCreated advanced pitch classifier with embeddings")
advanced_model = create_pitch_classifier_with_embeddings()
print(advanced_model.summary())
Python
# Pitch outcome prediction with TensorFlow/Keras
class PitchOutcomePredictor:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.outcome_encoder = {
            'ball': 0,
            'called_strike': 1,
            'swinging_strike': 2,
            'foul': 3,
            'in_play': 4
        }

    def create_pitch_data(self, n_samples=5000):
        """Generate synthetic pitch data"""
        np.random.seed(42)

        data = {
            # Pitch characteristics
            'release_speed': np.random.normal(92, 5, n_samples),
            'release_spin_rate': np.random.normal(2300, 300, n_samples),
            'pfx_x': np.random.normal(0, 8, n_samples),  # horizontal movement
            'pfx_z': np.random.normal(0, 10, n_samples),  # vertical movement
            'plate_x': np.random.uniform(-2, 2, n_samples),  # horizontal location
            'plate_z': np.random.uniform(1, 4, n_samples),  # vertical location
            'release_extension': np.random.normal(6.2, 0.3, n_samples),

            # Count
            'balls': np.random.randint(0, 4, n_samples),
            'strikes': np.random.randint(0, 3, n_samples),

            # Matchup
            'pitcher_handedness': np.random.choice([0, 1], n_samples),  # 0=L, 1=R
            'batter_handedness': np.random.choice([0, 1], n_samples),
            'pitch_number': np.random.randint(1, 12, n_samples),

            # Pitch type (encoded)
            'pitch_type': np.random.choice([0, 1, 2, 3, 4], n_samples),  # FF, SL, CH, CU, SI
        }

        df = pd.DataFrame(data)

        # Generate outcomes based on features (simplified logic)
        outcomes = []
        for idx, row in df.iterrows():
            # Distance from strike zone center
            distance_from_center = np.sqrt(row['plate_x']**2 + (row['plate_z'] - 2.5)**2)

            # Probabilities based on location and movement
            if distance_from_center > 1.2:  # Outside zone
                probs = [0.6, 0.15, 0.1, 0.1, 0.05]  # Likely ball
            elif distance_from_center < 0.3:  # Middle-middle
                probs = [0.05, 0.1, 0.15, 0.25, 0.45]  # Likely in play
            else:  # Edge of zone
                probs = [0.2, 0.25, 0.2, 0.25, 0.1]  # Mixed

            outcome = np.random.choice(list(self.outcome_encoder.values()), p=probs)
            outcomes.append(outcome)

        df['outcome'] = outcomes

        return df

    def build_model(self, input_dim, num_classes=5):
        """Build deep neural network for pitch outcome prediction"""
        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),
            layers.Dense(128, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(64, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(32, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.2),

            layers.Dense(16, activation='relu'),

            layers.Dense(num_classes, activation='softmax')
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    def train(self, epochs=50, batch_size=32):
        """Train the pitch outcome model"""
        # Generate data
        df = self.create_pitch_data(n_samples=10000)

        # Prepare features and target
        feature_cols = ['release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
                       'plate_x', 'plate_z', 'release_extension', 'balls',
                       'strikes', 'pitcher_handedness', 'batter_handedness',
                       'pitch_number', 'pitch_type']

        X = df[feature_cols].values
        y = df['outcome'].values

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Scale features
        X_train = self.scaler.fit_transform(X_train)
        X_test = self.scaler.transform(X_test)

        # Build model
        self.model = self.build_model(input_dim=X_train.shape[1])

        # Callbacks
        early_stopping = keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            restore_best_weights=True
        )

        reduce_lr = keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=0.00001
        )

        # Train
        history = self.model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[early_stopping, reduce_lr],
            verbose=1
        )

        # Evaluate
        test_loss, test_accuracy = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest Accuracy: {test_accuracy:.4f}")
        print(f"Test Loss: {test_loss:.4f}")

        return history

    def predict_outcome(self, pitch_features):
        """Predict outcome for new pitch"""
        scaled_features = self.scaler.transform([pitch_features])
        probabilities = self.model.predict(scaled_features, verbose=0)[0]

        outcome_names = ['Ball', 'Called Strike', 'Swinging Strike', 'Foul', 'In Play']
        prediction = {name: prob for name, prob in zip(outcome_names, probabilities)}

        return prediction

# Train the model
print("\n" + "="*60)
print("Training Pitch Outcome Prediction Model")
print("="*60)
predictor = PitchOutcomePredictor()
history = predictor.train(epochs=30)

# Example prediction
sample_pitch = [94.5, 2400, -5, 12, 0.2, 2.8, 6.3, 2, 1, 1, 0, 5, 0]
prediction = predictor.predict_outcome(sample_pitch)
print("\nSample Pitch Prediction:")
for outcome, prob in prediction.items():
    print(f"{outcome}: {prob:.3f}")
Python
def create_pitch_classifier_with_embeddings():
    """Advanced pitch outcome model with embeddings"""

    # Input layers for different feature types
    # Continuous features
    continuous_input = layers.Input(shape=(10,), name='continuous_features')

    # Categorical features (pitcher, batter, etc.)
    pitcher_input = layers.Input(shape=(1,), name='pitcher_id')
    batter_input = layers.Input(shape=(1,), name='batter_id')
    pitch_type_input = layers.Input(shape=(1,), name='pitch_type')

    # Embedding layers
    pitcher_embedding = layers.Embedding(
        input_dim=500,  # number of pitchers
        output_dim=16,
        name='pitcher_embedding'
    )(pitcher_input)
    pitcher_embedding = layers.Flatten()(pitcher_embedding)

    batter_embedding = layers.Embedding(
        input_dim=500,  # number of batters
        output_dim=16,
        name='batter_embedding'
    )(batter_input)
    batter_embedding = layers.Flatten()(batter_embedding)

    pitch_type_embedding = layers.Embedding(
        input_dim=10,  # number of pitch types
        output_dim=8,
        name='pitch_type_embedding'
    )(pitch_type_input)
    pitch_type_embedding = layers.Flatten()(pitch_type_embedding)

    # Concatenate all features
    concatenated = layers.Concatenate()([
        continuous_input,
        pitcher_embedding,
        batter_embedding,
        pitch_type_embedding
    ])

    # Deep network
    x = layers.Dense(128, activation='relu')(concatenated)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Dense(64, activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Dense(32, activation='relu')(x)
    x = layers.Dropout(0.2)(x)

    # Output layer
    output = layers.Dense(5, activation='softmax', name='outcome')(x)

    # Create model
    model = keras.Model(
        inputs=[continuous_input, pitcher_input, batter_input, pitch_type_input],
        outputs=output
    )

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

print("\nCreated advanced pitch classifier with embeddings")
advanced_model = create_pitch_classifier_with_embeddings()
print(advanced_model.summary())

28.3 Player Performance Forecasting with LSTMs

Time Series Analysis with Recurrent Networks

Long Short-Term Memory (LSTM) networks excel at sequential data, making them perfect for forecasting player performance over time. We'll build a model that predicts a player's next-game performance based on recent history.

class PlayerPerformanceLSTM:
    def __init__(self, sequence_length=10):
        self.sequence_length = sequence_length
        self.model = None
        self.scaler = StandardScaler()

    def create_player_timeseries(self, n_games=162, n_players=50):
        """Generate synthetic player performance time series"""
        np.random.seed(42)

        all_data = []

        for player_id in range(n_players):
            # Player skill level (affects baseline performance)
            skill = np.random.uniform(0.6, 1.4)

            # Seasonal trend
            games = np.arange(n_games)
            seasonal_trend = 0.1 * np.sin(2 * np.pi * games / n_games)

            # Random walk component
            random_walk = np.cumsum(np.random.normal(0, 0.05, n_games))
            random_walk = random_walk - random_walk.mean()

            # Performance metrics
            data = {
                'player_id': player_id,
                'game_number': games,
                'hits': np.random.poisson(skill * 1.2 + seasonal_trend + random_walk, n_games),
                'at_bats': np.random.poisson(4, n_games),
                'walks': np.random.poisson(skill * 0.3, n_games),
                'strikeouts': np.random.poisson((2 - skill) * 0.8, n_games),
                'home_runs': np.random.poisson(skill * 0.15, n_games),
            }

            player_df = pd.DataFrame(data)

            # Calculate batting average and OPS for each game
            player_df['batting_avg'] = player_df['hits'] / player_df['at_bats'].clip(lower=1)
            player_df['obp'] = (player_df['hits'] + player_df['walks']) / \
                              (player_df['at_bats'] + player_df['walks']).clip(lower=1)
            player_df['slg'] = (player_df['hits'] + player_df['home_runs'] * 3) / \
                              player_df['at_bats'].clip(lower=1)
            player_df['ops'] = player_df['obp'] + player_df['slg']

            # Rolling averages (last 10 games)
            player_df['rolling_avg'] = player_df['batting_avg'].rolling(10, min_periods=1).mean()
            player_df['rolling_ops'] = player_df['ops'].rolling(10, min_periods=1).mean()

            all_data.append(player_df)

        return pd.concat(all_data, ignore_index=True)

    def prepare_sequences(self, data, target_col='ops'):
        """Create sequences for LSTM training"""
        feature_cols = ['batting_avg', 'obp', 'slg', 'ops',
                       'rolling_avg', 'rolling_ops', 'hits',
                       'walks', 'strikeouts', 'home_runs']

        sequences = []
        targets = []

        # Group by player
        for player_id in data['player_id'].unique():
            player_data = data[data['player_id'] == player_id][feature_cols].values

            # Create sequences
            for i in range(len(player_data) - self.sequence_length):
                sequences.append(player_data[i:i + self.sequence_length])
                targets.append(player_data[i + self.sequence_length][3])  # OPS

        return np.array(sequences), np.array(targets)

    def build_lstm_model(self, input_shape):
        """Build LSTM model for performance forecasting"""
        model = keras.Sequential([
            layers.LSTM(64, return_sequences=True, input_shape=input_shape),
            layers.Dropout(0.3),

            layers.LSTM(32, return_sequences=True),
            layers.Dropout(0.3),

            layers.LSTM(16),
            layers.Dropout(0.2),

            layers.Dense(8, activation='relu'),
            layers.Dense(1)  # Predict next game OPS
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='mse',
            metrics=['mae']
        )

        return model

    def train(self, epochs=50, batch_size=32):
        """Train LSTM model"""
        print("Generating player time series data...")
        data = self.create_player_timeseries(n_games=162, n_players=100)

        print("Preparing sequences...")
        X, y = self.prepare_sequences(data)

        # Split data
        split_idx = int(0.8 * len(X))
        X_train, X_test = X[:split_idx], X[split_idx:]
        y_train, y_test = y[:split_idx], y[split_idx:]

        # Scale each feature independently
        n_samples, n_timesteps, n_features = X_train.shape
        X_train_reshaped = X_train.reshape(-1, n_features)
        X_test_reshaped = X_test.reshape(-1, n_features)

        X_train_scaled = self.scaler.fit_transform(X_train_reshaped)
        X_test_scaled = self.scaler.transform(X_test_reshaped)

        X_train = X_train_scaled.reshape(n_samples, n_timesteps, n_features)
        X_test = X_test_scaled.reshape(-1, n_timesteps, n_features)

        print(f"Training data shape: {X_train.shape}")
        print(f"Test data shape: {X_test.shape}")

        # Build model
        self.model = self.build_lstm_model(input_shape=(n_timesteps, n_features))

        # Callbacks
        early_stopping = keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=15,
            restore_best_weights=True
        )

        # Train
        history = self.model.fit(
            X_train, y_train,
            validation_data=(X_test, y_test),
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[early_stopping],
            verbose=1
        )

        # Evaluate
        test_loss, test_mae = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest MAE: {test_mae:.4f}")
        print(f"Test MSE: {test_loss:.4f}")

        return history

    def forecast_performance(self, recent_games):
        """Forecast next game performance"""
        scaled_games = self.scaler.transform(recent_games)
        scaled_games = scaled_games.reshape(1, self.sequence_length, -1)
        prediction = self.model.predict(scaled_games, verbose=0)[0][0]
        return prediction

# Train LSTM model
print("\n" + "="*60)
print("Training LSTM for Player Performance Forecasting")
print("="*60)
lstm_model = PlayerPerformanceLSTM(sequence_length=10)
lstm_history = lstm_model.train(epochs=30)

Bidirectional LSTM for Enhanced Context

Bidirectional LSTMs process sequences in both forward and backward directions, capturing more context.

def build_bidirectional_lstm(input_shape):
    """Bidirectional LSTM for player performance"""
    model = keras.Sequential([
        layers.Bidirectional(layers.LSTM(64, return_sequences=True),
                           input_shape=input_shape),
        layers.Dropout(0.3),

        layers.Bidirectional(layers.LSTM(32)),
        layers.Dropout(0.3),

        layers.Dense(16, activation='relu'),
        layers.Dense(1)
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='mse',
        metrics=['mae']
    )

    return model

print("\nBidirectional LSTM model created")
Python
class PlayerPerformanceLSTM:
    def __init__(self, sequence_length=10):
        self.sequence_length = sequence_length
        self.model = None
        self.scaler = StandardScaler()

    def create_player_timeseries(self, n_games=162, n_players=50):
        """Generate synthetic player performance time series"""
        np.random.seed(42)

        all_data = []

        for player_id in range(n_players):
            # Player skill level (affects baseline performance)
            skill = np.random.uniform(0.6, 1.4)

            # Seasonal trend
            games = np.arange(n_games)
            seasonal_trend = 0.1 * np.sin(2 * np.pi * games / n_games)

            # Random walk component
            random_walk = np.cumsum(np.random.normal(0, 0.05, n_games))
            random_walk = random_walk - random_walk.mean()

            # Performance metrics
            data = {
                'player_id': player_id,
                'game_number': games,
                'hits': np.random.poisson(skill * 1.2 + seasonal_trend + random_walk, n_games),
                'at_bats': np.random.poisson(4, n_games),
                'walks': np.random.poisson(skill * 0.3, n_games),
                'strikeouts': np.random.poisson((2 - skill) * 0.8, n_games),
                'home_runs': np.random.poisson(skill * 0.15, n_games),
            }

            player_df = pd.DataFrame(data)

            # Calculate batting average and OPS for each game
            player_df['batting_avg'] = player_df['hits'] / player_df['at_bats'].clip(lower=1)
            player_df['obp'] = (player_df['hits'] + player_df['walks']) / \
                              (player_df['at_bats'] + player_df['walks']).clip(lower=1)
            player_df['slg'] = (player_df['hits'] + player_df['home_runs'] * 3) / \
                              player_df['at_bats'].clip(lower=1)
            player_df['ops'] = player_df['obp'] + player_df['slg']

            # Rolling averages (last 10 games)
            player_df['rolling_avg'] = player_df['batting_avg'].rolling(10, min_periods=1).mean()
            player_df['rolling_ops'] = player_df['ops'].rolling(10, min_periods=1).mean()

            all_data.append(player_df)

        return pd.concat(all_data, ignore_index=True)

    def prepare_sequences(self, data, target_col='ops'):
        """Create sequences for LSTM training"""
        feature_cols = ['batting_avg', 'obp', 'slg', 'ops',
                       'rolling_avg', 'rolling_ops', 'hits',
                       'walks', 'strikeouts', 'home_runs']

        sequences = []
        targets = []

        # Group by player
        for player_id in data['player_id'].unique():
            player_data = data[data['player_id'] == player_id][feature_cols].values

            # Create sequences
            for i in range(len(player_data) - self.sequence_length):
                sequences.append(player_data[i:i + self.sequence_length])
                targets.append(player_data[i + self.sequence_length][3])  # OPS

        return np.array(sequences), np.array(targets)

    def build_lstm_model(self, input_shape):
        """Build LSTM model for performance forecasting"""
        model = keras.Sequential([
            layers.LSTM(64, return_sequences=True, input_shape=input_shape),
            layers.Dropout(0.3),

            layers.LSTM(32, return_sequences=True),
            layers.Dropout(0.3),

            layers.LSTM(16),
            layers.Dropout(0.2),

            layers.Dense(8, activation='relu'),
            layers.Dense(1)  # Predict next game OPS
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='mse',
            metrics=['mae']
        )

        return model

    def train(self, epochs=50, batch_size=32):
        """Train LSTM model"""
        print("Generating player time series data...")
        data = self.create_player_timeseries(n_games=162, n_players=100)

        print("Preparing sequences...")
        X, y = self.prepare_sequences(data)

        # Split data
        split_idx = int(0.8 * len(X))
        X_train, X_test = X[:split_idx], X[split_idx:]
        y_train, y_test = y[:split_idx], y[split_idx:]

        # Scale each feature independently
        n_samples, n_timesteps, n_features = X_train.shape
        X_train_reshaped = X_train.reshape(-1, n_features)
        X_test_reshaped = X_test.reshape(-1, n_features)

        X_train_scaled = self.scaler.fit_transform(X_train_reshaped)
        X_test_scaled = self.scaler.transform(X_test_reshaped)

        X_train = X_train_scaled.reshape(n_samples, n_timesteps, n_features)
        X_test = X_test_scaled.reshape(-1, n_timesteps, n_features)

        print(f"Training data shape: {X_train.shape}")
        print(f"Test data shape: {X_test.shape}")

        # Build model
        self.model = self.build_lstm_model(input_shape=(n_timesteps, n_features))

        # Callbacks
        early_stopping = keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=15,
            restore_best_weights=True
        )

        # Train
        history = self.model.fit(
            X_train, y_train,
            validation_data=(X_test, y_test),
            epochs=epochs,
            batch_size=batch_size,
            callbacks=[early_stopping],
            verbose=1
        )

        # Evaluate
        test_loss, test_mae = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest MAE: {test_mae:.4f}")
        print(f"Test MSE: {test_loss:.4f}")

        return history

    def forecast_performance(self, recent_games):
        """Forecast next game performance"""
        scaled_games = self.scaler.transform(recent_games)
        scaled_games = scaled_games.reshape(1, self.sequence_length, -1)
        prediction = self.model.predict(scaled_games, verbose=0)[0][0]
        return prediction

# Train LSTM model
print("\n" + "="*60)
print("Training LSTM for Player Performance Forecasting")
print("="*60)
lstm_model = PlayerPerformanceLSTM(sequence_length=10)
lstm_history = lstm_model.train(epochs=30)
Python
def build_bidirectional_lstm(input_shape):
    """Bidirectional LSTM for player performance"""
    model = keras.Sequential([
        layers.Bidirectional(layers.LSTM(64, return_sequences=True),
                           input_shape=input_shape),
        layers.Dropout(0.3),

        layers.Bidirectional(layers.LSTM(32)),
        layers.Dropout(0.3),

        layers.Dense(16, activation='relu'),
        layers.Dense(1)
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='mse',
        metrics=['mae']
    )

    return model

print("\nBidirectional LSTM model created")

28.4 Computer Vision for Baseball

Swing Analysis with Convolutional Neural Networks

Computer vision enables automated analysis of player movements, pitch tracking, and defensive positioning. We'll explore CNNs for swing classification and pose estimation.

class SwingAnalyzer:
    """CNN-based swing analysis system"""

    def __init__(self, img_height=224, img_width=224):
        self.img_height = img_height
        self.img_width = img_width
        self.model = None

    def create_synthetic_swing_data(self, n_samples=1000):
        """
        Generate synthetic swing features
        In practice, these would come from video frame analysis
        """
        np.random.seed(42)

        # Swing types: good, long, uppercut, level, ground_ball
        swing_types = ['good', 'long', 'uppercut', 'level', 'ground_ball']

        data = []
        for swing_type in swing_types:
            n = n_samples // len(swing_types)

            if swing_type == 'good':
                bat_speed = np.random.normal(75, 3, n)
                attack_angle = np.random.normal(12, 3, n)
                time_to_contact = np.random.normal(0.16, 0.01, n)
                vertical_bat_angle = np.random.normal(25, 5, n)
            elif swing_type == 'long':
                bat_speed = np.random.normal(70, 4, n)
                attack_angle = np.random.normal(10, 4, n)
                time_to_contact = np.random.normal(0.19, 0.02, n)
                vertical_bat_angle = np.random.normal(30, 6, n)
            elif swing_type == 'uppercut':
                bat_speed = np.random.normal(73, 3, n)
                attack_angle = np.random.normal(18, 3, n)
                time_to_contact = np.random.normal(0.17, 0.01, n)
                vertical_bat_angle = np.random.normal(35, 5, n)
            elif swing_type == 'level':
                bat_speed = np.random.normal(74, 3, n)
                attack_angle = np.random.normal(8, 2, n)
                time_to_contact = np.random.normal(0.16, 0.01, n)
                vertical_bat_angle = np.random.normal(20, 4, n)
            else:  # ground_ball
                bat_speed = np.random.normal(71, 4, n)
                attack_angle = np.random.normal(-5, 3, n)
                time_to_contact = np.random.normal(0.18, 0.02, n)
                vertical_bat_angle = np.random.normal(15, 5, n)

            for i in range(n):
                data.append({
                    'bat_speed': bat_speed[i],
                    'attack_angle': attack_angle[i],
                    'time_to_contact': time_to_contact[i],
                    'vertical_bat_angle': vertical_bat_angle[i],
                    'swing_type': swing_type
                })

        return pd.DataFrame(data)

    def build_swing_classifier(self, input_dim=4, num_classes=5):
        """Build classifier for swing analysis"""
        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),

            layers.Dense(128, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.4),

            layers.Dense(64, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(32, activation='relu'),
            layers.Dropout(0.2),

            layers.Dense(num_classes, activation='softmax')
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    def train(self, epochs=40):
        """Train swing classification model"""
        df = self.create_synthetic_swing_data(n_samples=2000)

        # Encode swing types
        swing_encoder = {
            'good': 0, 'long': 1, 'uppercut': 2,
            'level': 3, 'ground_ball': 4
        }
        df['swing_type_encoded'] = df['swing_type'].map(swing_encoder)

        # Prepare data
        feature_cols = ['bat_speed', 'attack_angle', 'time_to_contact',
                       'vertical_bat_angle']
        X = df[feature_cols].values
        y = df['swing_type_encoded'].values

        # Split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # Scale
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)

        # Build and train
        self.model = self.build_swing_classifier()

        history = self.model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=epochs,
            batch_size=32,
            verbose=1
        )

        # Evaluate
        test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest Accuracy: {test_acc:.4f}")

        return history, scaler, swing_encoder

# Train swing analyzer
print("\n" + "="*60)
print("Training Swing Analysis Model")
print("="*60)
swing_analyzer = SwingAnalyzer()
swing_history, swing_scaler, swing_encoder = swing_analyzer.train(epochs=30)

CNN Architecture for Video Frame Analysis

For actual video analysis, we would use convolutional layers to process frames:

def build_video_frame_cnn(img_height=224, img_width=224):
    """
    CNN for analyzing swing video frames
    Input: Video frames of baseball swings
    Output: Swing classification or pose keypoints
    """
    model = keras.Sequential([
        # Convolutional blocks
        layers.Conv2D(32, (3, 3), activation='relu',
                     input_shape=(img_height, img_width, 3)),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(256, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        # Flatten and dense layers
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.3),

        # Output layer (5 swing classes)
        layers.Dense(5, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

print("\nCNN for video frame analysis created")
video_cnn = build_video_frame_cnn()
print(f"Total parameters: {video_cnn.count_params():,}")

Transfer Learning for Pitch Recognition

Using pre-trained models like ResNet or EfficientNet for pitch type recognition from broadcast video:

def build_pitch_recognition_model():
    """
    Transfer learning model for pitch type recognition
    Uses pre-trained ResNet50 as feature extractor
    """
    # Load pre-trained ResNet50
    base_model = keras.applications.ResNet50(
        weights='imagenet',
        include_top=False,
        input_shape=(224, 224, 3)
    )

    # Freeze base model layers
    base_model.trainable = False

    # Add custom classification layers
    model = keras.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(10, activation='softmax')  # 10 pitch types
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

print("\nTransfer learning model for pitch recognition created")
Python
class SwingAnalyzer:
    """CNN-based swing analysis system"""

    def __init__(self, img_height=224, img_width=224):
        self.img_height = img_height
        self.img_width = img_width
        self.model = None

    def create_synthetic_swing_data(self, n_samples=1000):
        """
        Generate synthetic swing features
        In practice, these would come from video frame analysis
        """
        np.random.seed(42)

        # Swing types: good, long, uppercut, level, ground_ball
        swing_types = ['good', 'long', 'uppercut', 'level', 'ground_ball']

        data = []
        for swing_type in swing_types:
            n = n_samples // len(swing_types)

            if swing_type == 'good':
                bat_speed = np.random.normal(75, 3, n)
                attack_angle = np.random.normal(12, 3, n)
                time_to_contact = np.random.normal(0.16, 0.01, n)
                vertical_bat_angle = np.random.normal(25, 5, n)
            elif swing_type == 'long':
                bat_speed = np.random.normal(70, 4, n)
                attack_angle = np.random.normal(10, 4, n)
                time_to_contact = np.random.normal(0.19, 0.02, n)
                vertical_bat_angle = np.random.normal(30, 6, n)
            elif swing_type == 'uppercut':
                bat_speed = np.random.normal(73, 3, n)
                attack_angle = np.random.normal(18, 3, n)
                time_to_contact = np.random.normal(0.17, 0.01, n)
                vertical_bat_angle = np.random.normal(35, 5, n)
            elif swing_type == 'level':
                bat_speed = np.random.normal(74, 3, n)
                attack_angle = np.random.normal(8, 2, n)
                time_to_contact = np.random.normal(0.16, 0.01, n)
                vertical_bat_angle = np.random.normal(20, 4, n)
            else:  # ground_ball
                bat_speed = np.random.normal(71, 4, n)
                attack_angle = np.random.normal(-5, 3, n)
                time_to_contact = np.random.normal(0.18, 0.02, n)
                vertical_bat_angle = np.random.normal(15, 5, n)

            for i in range(n):
                data.append({
                    'bat_speed': bat_speed[i],
                    'attack_angle': attack_angle[i],
                    'time_to_contact': time_to_contact[i],
                    'vertical_bat_angle': vertical_bat_angle[i],
                    'swing_type': swing_type
                })

        return pd.DataFrame(data)

    def build_swing_classifier(self, input_dim=4, num_classes=5):
        """Build classifier for swing analysis"""
        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),

            layers.Dense(128, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.4),

            layers.Dense(64, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(32, activation='relu'),
            layers.Dropout(0.2),

            layers.Dense(num_classes, activation='softmax')
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    def train(self, epochs=40):
        """Train swing classification model"""
        df = self.create_synthetic_swing_data(n_samples=2000)

        # Encode swing types
        swing_encoder = {
            'good': 0, 'long': 1, 'uppercut': 2,
            'level': 3, 'ground_ball': 4
        }
        df['swing_type_encoded'] = df['swing_type'].map(swing_encoder)

        # Prepare data
        feature_cols = ['bat_speed', 'attack_angle', 'time_to_contact',
                       'vertical_bat_angle']
        X = df[feature_cols].values
        y = df['swing_type_encoded'].values

        # Split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # Scale
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)

        # Build and train
        self.model = self.build_swing_classifier()

        history = self.model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=epochs,
            batch_size=32,
            verbose=1
        )

        # Evaluate
        test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest Accuracy: {test_acc:.4f}")

        return history, scaler, swing_encoder

# Train swing analyzer
print("\n" + "="*60)
print("Training Swing Analysis Model")
print("="*60)
swing_analyzer = SwingAnalyzer()
swing_history, swing_scaler, swing_encoder = swing_analyzer.train(epochs=30)
Python
def build_video_frame_cnn(img_height=224, img_width=224):
    """
    CNN for analyzing swing video frames
    Input: Video frames of baseball swings
    Output: Swing classification or pose keypoints
    """
    model = keras.Sequential([
        # Convolutional blocks
        layers.Conv2D(32, (3, 3), activation='relu',
                     input_shape=(img_height, img_width, 3)),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(256, (3, 3), activation='relu'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),

        # Flatten and dense layers
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.3),

        # Output layer (5 swing classes)
        layers.Dense(5, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

print("\nCNN for video frame analysis created")
video_cnn = build_video_frame_cnn()
print(f"Total parameters: {video_cnn.count_params():,}")
Python
def build_pitch_recognition_model():
    """
    Transfer learning model for pitch type recognition
    Uses pre-trained ResNet50 as feature extractor
    """
    # Load pre-trained ResNet50
    base_model = keras.applications.ResNet50(
        weights='imagenet',
        include_top=False,
        input_shape=(224, 224, 3)
    )

    # Freeze base model layers
    base_model.trainable = False

    # Add custom classification layers
    model = keras.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(256, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(10, activation='softmax')  # 10 pitch types
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.0001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

print("\nTransfer learning model for pitch recognition created")

28.5 Trajectory Modeling with Neural Networks

Physics-Informed Neural Networks for Ball Flight

Traditional physics models use differential equations to model ball flight. Neural networks can learn these patterns directly from data while incorporating physical constraints.

class TrajectoryPredictor:
    """Neural network for baseball trajectory prediction"""

    def __init__(self):
        self.model = None
        self.scaler_X = StandardScaler()
        self.scaler_y = StandardScaler()

    def generate_trajectory_data(self, n_samples=5000):
        """
        Generate synthetic trajectory data based on physics
        Simplified model of batted ball flight
        """
        np.random.seed(42)

        data = []

        for _ in range(n_samples):
            # Initial conditions
            exit_velocity = np.random.uniform(70, 110)  # mph
            launch_angle = np.random.uniform(-10, 50)  # degrees
            spray_angle = np.random.uniform(-45, 45)  # degrees
            spin_rate = np.random.uniform(1000, 3500)  # rpm

            # Convert to SI units for physics calculation
            v0 = exit_velocity * 0.44704  # mph to m/s
            theta = np.radians(launch_angle)
            phi = np.radians(spray_angle)

            # Simplified trajectory calculation (ignoring Magnus force for simplicity)
            g = 9.81  # gravity

            # Time of flight (simplified)
            if theta > 0:
                t_flight = 2 * v0 * np.sin(theta) / g
            else:
                t_flight = 0.1

            # Landing distance (simplified)
            distance = (v0**2 * np.sin(2 * theta) / g) * 0.3048  # meters to feet
            distance = max(0, distance + np.random.normal(0, 10))  # Add noise

            # Maximum height
            max_height = (v0**2 * np.sin(theta)**2) / (2 * g) * 3.281  # meters to feet
            max_height = max(0, max_height + np.random.normal(0, 5))

            # Hang time
            hang_time = t_flight + np.random.normal(0, 0.1)
            hang_time = max(0, hang_time)

            # Landing location (x, y) on field
            landing_x = distance * np.cos(phi)
            landing_y = distance * np.sin(phi)

            data.append({
                'exit_velocity': exit_velocity,
                'launch_angle': launch_angle,
                'spray_angle': spray_angle,
                'spin_rate': spin_rate,
                'distance': distance,
                'max_height': max_height,
                'hang_time': hang_time,
                'landing_x': landing_x,
                'landing_y': landing_y
            })

        return pd.DataFrame(data)

    def build_trajectory_model(self, input_dim=4):
        """
        Build neural network for trajectory prediction
        Multiple outputs: distance, max_height, hang_time, landing coordinates
        """
        inputs = layers.Input(shape=(input_dim,))

        # Shared layers
        x = layers.Dense(128, activation='relu')(inputs)
        x = layers.BatchNormalization()(x)
        x = layers.Dropout(0.3)(x)

        x = layers.Dense(64, activation='relu')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Dropout(0.3)(x)

        x = layers.Dense(32, activation='relu')(x)
        x = layers.Dropout(0.2)(x)

        # Multiple output heads
        distance_output = layers.Dense(16, activation='relu')(x)
        distance_output = layers.Dense(1, name='distance')(distance_output)

        height_output = layers.Dense(16, activation='relu')(x)
        height_output = layers.Dense(1, name='max_height')(height_output)

        time_output = layers.Dense(16, activation='relu')(x)
        time_output = layers.Dense(1, name='hang_time')(time_output)

        location_output = layers.Dense(16, activation='relu')(x)
        location_output = layers.Dense(2, name='landing_location')(location_output)

        # Create model
        model = keras.Model(
            inputs=inputs,
            outputs=[distance_output, height_output, time_output, location_output]
        )

        # Compile with different losses for each output
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss={
                'distance': 'mse',
                'max_height': 'mse',
                'hang_time': 'mse',
                'landing_location': 'mse'
            },
            loss_weights={
                'distance': 1.0,
                'max_height': 0.5,
                'hang_time': 0.5,
                'landing_location': 0.8
            },
            metrics=['mae']
        )

        return model

    def train(self, epochs=50):
        """Train trajectory prediction model"""
        print("Generating trajectory data...")
        df = self.generate_trajectory_data(n_samples=10000)

        # Prepare data
        input_cols = ['exit_velocity', 'launch_angle', 'spray_angle', 'spin_rate']
        X = df[input_cols].values

        y_distance = df['distance'].values.reshape(-1, 1)
        y_height = df['max_height'].values.reshape(-1, 1)
        y_time = df['hang_time'].values.reshape(-1, 1)
        y_location = df[['landing_x', 'landing_y']].values

        # Split data
        split_idx = int(0.8 * len(X))
        X_train, X_test = X[:split_idx], X[split_idx:]

        y_dist_train, y_dist_test = y_distance[:split_idx], y_distance[split_idx:]
        y_height_train, y_height_test = y_height[:split_idx], y_height[split_idx:]
        y_time_train, y_time_test = y_time[:split_idx], y_time[split_idx:]
        y_loc_train, y_loc_test = y_location[:split_idx], y_location[split_idx:]

        # Scale features
        X_train = self.scaler_X.fit_transform(X_train)
        X_test = self.scaler_X.transform(X_test)

        # Build model
        self.model = self.build_trajectory_model(input_dim=4)

        # Train
        history = self.model.fit(
            X_train,
            {
                'distance': y_dist_train,
                'max_height': y_height_train,
                'hang_time': y_time_train,
                'landing_location': y_loc_train
            },
            validation_data=(
                X_test,
                {
                    'distance': y_dist_test,
                    'max_height': y_height_test,
                    'hang_time': y_time_test,
                    'landing_location': y_loc_test
                }
            ),
            epochs=epochs,
            batch_size=32,
            verbose=1
        )

        print("\nEvaluating model...")
        self.model.evaluate(
            X_test,
            {
                'distance': y_dist_test,
                'max_height': y_height_test,
                'hang_time': y_time_test,
                'landing_location': y_loc_test
            },
            verbose=0
        )

        return history

    def predict_trajectory(self, exit_velocity, launch_angle, spray_angle, spin_rate):
        """Predict trajectory for given initial conditions"""
        X = np.array([[exit_velocity, launch_angle, spray_angle, spin_rate]])
        X_scaled = self.scaler_X.transform(X)

        predictions = self.model.predict(X_scaled, verbose=0)

        result = {
            'distance': predictions[0][0][0],
            'max_height': predictions[1][0][0],
            'hang_time': predictions[2][0][0],
            'landing_x': predictions[3][0][0],
            'landing_y': predictions[3][0][1]
        }

        return result

# Train trajectory model
print("\n" + "="*60)
print("Training Trajectory Prediction Model")
print("="*60)
trajectory_model = TrajectoryPredictor()
traj_history = trajectory_model.train(epochs=40)

# Example prediction
print("\nExample Trajectory Prediction:")
print("Input: 105 mph exit velocity, 28° launch angle, 10° spray angle, 2200 rpm spin")
prediction = trajectory_model.predict_trajectory(105, 28, 10, 2200)
for key, value in prediction.items():
    print(f"{key}: {value:.2f}")
Python
class TrajectoryPredictor:
    """Neural network for baseball trajectory prediction"""

    def __init__(self):
        self.model = None
        self.scaler_X = StandardScaler()
        self.scaler_y = StandardScaler()

    def generate_trajectory_data(self, n_samples=5000):
        """
        Generate synthetic trajectory data based on physics
        Simplified model of batted ball flight
        """
        np.random.seed(42)

        data = []

        for _ in range(n_samples):
            # Initial conditions
            exit_velocity = np.random.uniform(70, 110)  # mph
            launch_angle = np.random.uniform(-10, 50)  # degrees
            spray_angle = np.random.uniform(-45, 45)  # degrees
            spin_rate = np.random.uniform(1000, 3500)  # rpm

            # Convert to SI units for physics calculation
            v0 = exit_velocity * 0.44704  # mph to m/s
            theta = np.radians(launch_angle)
            phi = np.radians(spray_angle)

            # Simplified trajectory calculation (ignoring Magnus force for simplicity)
            g = 9.81  # gravity

            # Time of flight (simplified)
            if theta > 0:
                t_flight = 2 * v0 * np.sin(theta) / g
            else:
                t_flight = 0.1

            # Landing distance (simplified)
            distance = (v0**2 * np.sin(2 * theta) / g) * 0.3048  # meters to feet
            distance = max(0, distance + np.random.normal(0, 10))  # Add noise

            # Maximum height
            max_height = (v0**2 * np.sin(theta)**2) / (2 * g) * 3.281  # meters to feet
            max_height = max(0, max_height + np.random.normal(0, 5))

            # Hang time
            hang_time = t_flight + np.random.normal(0, 0.1)
            hang_time = max(0, hang_time)

            # Landing location (x, y) on field
            landing_x = distance * np.cos(phi)
            landing_y = distance * np.sin(phi)

            data.append({
                'exit_velocity': exit_velocity,
                'launch_angle': launch_angle,
                'spray_angle': spray_angle,
                'spin_rate': spin_rate,
                'distance': distance,
                'max_height': max_height,
                'hang_time': hang_time,
                'landing_x': landing_x,
                'landing_y': landing_y
            })

        return pd.DataFrame(data)

    def build_trajectory_model(self, input_dim=4):
        """
        Build neural network for trajectory prediction
        Multiple outputs: distance, max_height, hang_time, landing coordinates
        """
        inputs = layers.Input(shape=(input_dim,))

        # Shared layers
        x = layers.Dense(128, activation='relu')(inputs)
        x = layers.BatchNormalization()(x)
        x = layers.Dropout(0.3)(x)

        x = layers.Dense(64, activation='relu')(x)
        x = layers.BatchNormalization()(x)
        x = layers.Dropout(0.3)(x)

        x = layers.Dense(32, activation='relu')(x)
        x = layers.Dropout(0.2)(x)

        # Multiple output heads
        distance_output = layers.Dense(16, activation='relu')(x)
        distance_output = layers.Dense(1, name='distance')(distance_output)

        height_output = layers.Dense(16, activation='relu')(x)
        height_output = layers.Dense(1, name='max_height')(height_output)

        time_output = layers.Dense(16, activation='relu')(x)
        time_output = layers.Dense(1, name='hang_time')(time_output)

        location_output = layers.Dense(16, activation='relu')(x)
        location_output = layers.Dense(2, name='landing_location')(location_output)

        # Create model
        model = keras.Model(
            inputs=inputs,
            outputs=[distance_output, height_output, time_output, location_output]
        )

        # Compile with different losses for each output
        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss={
                'distance': 'mse',
                'max_height': 'mse',
                'hang_time': 'mse',
                'landing_location': 'mse'
            },
            loss_weights={
                'distance': 1.0,
                'max_height': 0.5,
                'hang_time': 0.5,
                'landing_location': 0.8
            },
            metrics=['mae']
        )

        return model

    def train(self, epochs=50):
        """Train trajectory prediction model"""
        print("Generating trajectory data...")
        df = self.generate_trajectory_data(n_samples=10000)

        # Prepare data
        input_cols = ['exit_velocity', 'launch_angle', 'spray_angle', 'spin_rate']
        X = df[input_cols].values

        y_distance = df['distance'].values.reshape(-1, 1)
        y_height = df['max_height'].values.reshape(-1, 1)
        y_time = df['hang_time'].values.reshape(-1, 1)
        y_location = df[['landing_x', 'landing_y']].values

        # Split data
        split_idx = int(0.8 * len(X))
        X_train, X_test = X[:split_idx], X[split_idx:]

        y_dist_train, y_dist_test = y_distance[:split_idx], y_distance[split_idx:]
        y_height_train, y_height_test = y_height[:split_idx], y_height[split_idx:]
        y_time_train, y_time_test = y_time[:split_idx], y_time[split_idx:]
        y_loc_train, y_loc_test = y_location[:split_idx], y_location[split_idx:]

        # Scale features
        X_train = self.scaler_X.fit_transform(X_train)
        X_test = self.scaler_X.transform(X_test)

        # Build model
        self.model = self.build_trajectory_model(input_dim=4)

        # Train
        history = self.model.fit(
            X_train,
            {
                'distance': y_dist_train,
                'max_height': y_height_train,
                'hang_time': y_time_train,
                'landing_location': y_loc_train
            },
            validation_data=(
                X_test,
                {
                    'distance': y_dist_test,
                    'max_height': y_height_test,
                    'hang_time': y_time_test,
                    'landing_location': y_loc_test
                }
            ),
            epochs=epochs,
            batch_size=32,
            verbose=1
        )

        print("\nEvaluating model...")
        self.model.evaluate(
            X_test,
            {
                'distance': y_dist_test,
                'max_height': y_height_test,
                'hang_time': y_time_test,
                'landing_location': y_loc_test
            },
            verbose=0
        )

        return history

    def predict_trajectory(self, exit_velocity, launch_angle, spray_angle, spin_rate):
        """Predict trajectory for given initial conditions"""
        X = np.array([[exit_velocity, launch_angle, spray_angle, spin_rate]])
        X_scaled = self.scaler_X.transform(X)

        predictions = self.model.predict(X_scaled, verbose=0)

        result = {
            'distance': predictions[0][0][0],
            'max_height': predictions[1][0][0],
            'hang_time': predictions[2][0][0],
            'landing_x': predictions[3][0][0],
            'landing_y': predictions[3][0][1]
        }

        return result

# Train trajectory model
print("\n" + "="*60)
print("Training Trajectory Prediction Model")
print("="*60)
trajectory_model = TrajectoryPredictor()
traj_history = trajectory_model.train(epochs=40)

# Example prediction
print("\nExample Trajectory Prediction:")
print("Input: 105 mph exit velocity, 28° launch angle, 10° spray angle, 2200 rpm spin")
prediction = trajectory_model.predict_trajectory(105, 28, 10, 2200)
for key, value in prediction.items():
    print(f"{key}: {value:.2f}")

28.6 Building a Pitch Type Classifier

End-to-End Pitch Classification Pipeline

A complete implementation of a pitch type classifier using neural networks, from data preparation to deployment.

class PitchTypeClassifier:
    """Complete pitch type classification system"""

    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.pitch_types = [
            'Fastball', 'Sinker', 'Cutter', 'Slider',
            'Curveball', 'Changeup', 'Splitter', 'Knuckleball'
        ]
        self.encoder = {pitch: i for i, pitch in enumerate(self.pitch_types)}
        self.decoder = {i: pitch for i, pitch in enumerate(self.pitch_types)}

    def generate_pitch_data(self, n_samples=10000):
        """Generate realistic pitch type data"""
        np.random.seed(42)

        data = []

        # Define characteristics for each pitch type
        pitch_profiles = {
            'Fastball': {
                'velocity': (92, 3),
                'spin_rate': (2300, 200),
                'h_break': (0, 6),
                'v_break': (15, 4),
                'extension': (6.2, 0.3)
            },
            'Sinker': {
                'velocity': (92, 3),
                'spin_rate': (2100, 200),
                'h_break': (12, 4),
                'v_break': (5, 3),
                'extension': (6.1, 0.3)
            },
            'Cutter': {
                'velocity': (89, 3),
                'spin_rate': (2400, 200),
                'h_break': (-4, 3),
                'v_break': (10, 3),
                'extension': (6.0, 0.3)
            },
            'Slider': {
                'velocity': (85, 3),
                'spin_rate': (2500, 250),
                'h_break': (4, 4),
                'v_break': (-2, 3),
                'extension': (6.0, 0.3)
            },
            'Curveball': {
                'velocity': (78, 3),
                'spin_rate': (2700, 300),
                'h_break': (8, 4),
                'v_break': (-8, 3),
                'extension': (5.9, 0.3)
            },
            'Changeup': {
                'velocity': (84, 3),
                'spin_rate': (1700, 200),
                'h_break': (14, 4),
                'v_break': (8, 3),
                'extension': (6.1, 0.3)
            },
            'Splitter': {
                'velocity': (86, 3),
                'spin_rate': (1500, 200),
                'h_break': (10, 4),
                'v_break': (-4, 3),
                'extension': (6.0, 0.3)
            },
            'Knuckleball': {
                'velocity': (68, 4),
                'spin_rate': (300, 100),
                'h_break': (0, 10),
                'v_break': (0, 8),
                'extension': (5.8, 0.4)
            }
        }

        samples_per_type = n_samples // len(self.pitch_types)

        for pitch_type, profile in pitch_profiles.items():
            for _ in range(samples_per_type):
                velocity = np.random.normal(*profile['velocity'])
                spin_rate = np.random.normal(*profile['spin_rate'])
                h_break = np.random.normal(*profile['h_break'])
                v_break = np.random.normal(*profile['v_break'])
                extension = np.random.normal(*profile['extension'])

                # Derived features
                spin_axis = np.random.uniform(0, 360)  # degrees

                data.append({
                    'release_speed': velocity,
                    'release_spin_rate': spin_rate,
                    'pfx_x': h_break,
                    'pfx_z': v_break,
                    'release_extension': extension,
                    'release_pos_x': np.random.normal(0, 0.5),
                    'release_pos_z': np.random.normal(6, 0.3),
                    'spin_axis': spin_axis,
                    'pitch_type': pitch_type
                })

        return pd.DataFrame(data)

    def create_advanced_features(self, df):
        """Engineer advanced features for better classification"""
        # Velocity-adjusted spin
        df['spin_per_velocity'] = df['release_spin_rate'] / df['release_speed']

        # Total break
        df['total_break'] = np.sqrt(df['pfx_x']**2 + df['pfx_z']**2)

        # Break angle
        df['break_angle'] = np.arctan2(df['pfx_z'], df['pfx_x']) * 180 / np.pi

        # Velocity bands
        df['velo_diff_from_90'] = df['release_speed'] - 90

        # Interaction terms
        df['spin_break_interaction'] = df['release_spin_rate'] * df['total_break']

        return df

    def build_classifier(self, input_dim):
        """Build deep neural network classifier"""
        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),

            layers.Dense(256, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.4),

            layers.Dense(128, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(64, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(32, activation='relu'),
            layers.Dropout(0.2),

            layers.Dense(len(self.pitch_types), activation='softmax')
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    def train(self, epochs=50):
        """Train the pitch type classifier"""
        print("Generating pitch data...")
        df = self.generate_pitch_data(n_samples=15000)

        print("Engineering features...")
        df = self.create_advanced_features(df)

        # Encode target
        df['pitch_type_encoded'] = df['pitch_type'].map(self.encoder)

        # Select features
        feature_cols = [
            'release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
            'release_extension', 'release_pos_x', 'release_pos_z',
            'spin_axis', 'spin_per_velocity', 'total_break',
            'break_angle', 'velo_diff_from_90', 'spin_break_interaction'
        ]

        X = df[feature_cols].values
        y = df['pitch_type_encoded'].values

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # Scale features
        X_train = self.scaler.fit_transform(X_train)
        X_test = self.scaler.transform(X_test)

        # Build model
        self.model = self.build_classifier(input_dim=len(feature_cols))

        # Callbacks
        early_stopping = keras.callbacks.EarlyStopping(
            monitor='val_accuracy',
            patience=15,
            restore_best_weights=True
        )

        reduce_lr = keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=0.00001
        )

        # Train
        print("\nTraining classifier...")
        history = self.model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=epochs,
            batch_size=32,
            callbacks=[early_stopping, reduce_lr],
            verbose=1
        )

        # Evaluate
        test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest Accuracy: {test_acc:.4f}")

        # Confusion matrix
        from sklearn.metrics import classification_report, confusion_matrix

        y_pred = np.argmax(self.model.predict(X_test, verbose=0), axis=1)

        print("\nClassification Report:")
        print(classification_report(y_test, y_pred,
                                   target_names=self.pitch_types))

        return history

    def predict_pitch_type(self, pitch_features):
        """Predict pitch type with probabilities"""
        # Engineer same features as training
        features = {
            'release_speed': pitch_features[0],
            'release_spin_rate': pitch_features[1],
            'pfx_x': pitch_features[2],
            'pfx_z': pitch_features[3],
            'release_extension': pitch_features[4],
            'release_pos_x': pitch_features[5],
            'release_pos_z': pitch_features[6],
            'spin_axis': pitch_features[7]
        }

        # Derived features
        features['spin_per_velocity'] = features['release_spin_rate'] / features['release_speed']
        features['total_break'] = np.sqrt(features['pfx_x']**2 + features['pfx_z']**2)
        features['break_angle'] = np.arctan2(features['pfx_z'], features['pfx_x']) * 180 / np.pi
        features['velo_diff_from_90'] = features['release_speed'] - 90
        features['spin_break_interaction'] = features['release_spin_rate'] * features['total_break']

        # Create feature vector
        X = np.array([[
            features['release_speed'], features['release_spin_rate'],
            features['pfx_x'], features['pfx_z'], features['release_extension'],
            features['release_pos_x'], features['release_pos_z'],
            features['spin_axis'], features['spin_per_velocity'],
            features['total_break'], features['break_angle'],
            features['velo_diff_from_90'], features['spin_break_interaction']
        ]])

        X_scaled = self.scaler.transform(X)
        probabilities = self.model.predict(X_scaled, verbose=0)[0]

        predictions = {pitch: prob for pitch, prob in
                      zip(self.pitch_types, probabilities)}

        predicted_type = max(predictions, key=predictions.get)

        return predicted_type, predictions

# Train pitch type classifier
print("\n" + "="*60)
print("Training Pitch Type Classifier")
print("="*60)
pitch_classifier = PitchTypeClassifier()
pitch_history = pitch_classifier.train(epochs=40)

# Example predictions
print("\n" + "="*60)
print("Example Pitch Predictions")
print("="*60)

# Fastball
fastball_features = [94, 2350, 0, 16, 6.2, 0, 6.0, 180]
pitch_type, probs = pitch_classifier.predict_pitch_type(fastball_features)
print(f"\nFastball-like pitch (94 mph, 2350 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
    print(f"  {pitch}: {prob:.3f}")

# Curveball
curveball_features = [77, 2750, 8, -9, 5.9, 0, 6.0, 90]
pitch_type, probs = pitch_classifier.predict_pitch_type(curveball_features)
print(f"\nCurveball-like pitch (77 mph, 2750 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
    print(f"  {pitch}: {prob:.3f}")
Python
class PitchTypeClassifier:
    """Complete pitch type classification system"""

    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.pitch_types = [
            'Fastball', 'Sinker', 'Cutter', 'Slider',
            'Curveball', 'Changeup', 'Splitter', 'Knuckleball'
        ]
        self.encoder = {pitch: i for i, pitch in enumerate(self.pitch_types)}
        self.decoder = {i: pitch for i, pitch in enumerate(self.pitch_types)}

    def generate_pitch_data(self, n_samples=10000):
        """Generate realistic pitch type data"""
        np.random.seed(42)

        data = []

        # Define characteristics for each pitch type
        pitch_profiles = {
            'Fastball': {
                'velocity': (92, 3),
                'spin_rate': (2300, 200),
                'h_break': (0, 6),
                'v_break': (15, 4),
                'extension': (6.2, 0.3)
            },
            'Sinker': {
                'velocity': (92, 3),
                'spin_rate': (2100, 200),
                'h_break': (12, 4),
                'v_break': (5, 3),
                'extension': (6.1, 0.3)
            },
            'Cutter': {
                'velocity': (89, 3),
                'spin_rate': (2400, 200),
                'h_break': (-4, 3),
                'v_break': (10, 3),
                'extension': (6.0, 0.3)
            },
            'Slider': {
                'velocity': (85, 3),
                'spin_rate': (2500, 250),
                'h_break': (4, 4),
                'v_break': (-2, 3),
                'extension': (6.0, 0.3)
            },
            'Curveball': {
                'velocity': (78, 3),
                'spin_rate': (2700, 300),
                'h_break': (8, 4),
                'v_break': (-8, 3),
                'extension': (5.9, 0.3)
            },
            'Changeup': {
                'velocity': (84, 3),
                'spin_rate': (1700, 200),
                'h_break': (14, 4),
                'v_break': (8, 3),
                'extension': (6.1, 0.3)
            },
            'Splitter': {
                'velocity': (86, 3),
                'spin_rate': (1500, 200),
                'h_break': (10, 4),
                'v_break': (-4, 3),
                'extension': (6.0, 0.3)
            },
            'Knuckleball': {
                'velocity': (68, 4),
                'spin_rate': (300, 100),
                'h_break': (0, 10),
                'v_break': (0, 8),
                'extension': (5.8, 0.4)
            }
        }

        samples_per_type = n_samples // len(self.pitch_types)

        for pitch_type, profile in pitch_profiles.items():
            for _ in range(samples_per_type):
                velocity = np.random.normal(*profile['velocity'])
                spin_rate = np.random.normal(*profile['spin_rate'])
                h_break = np.random.normal(*profile['h_break'])
                v_break = np.random.normal(*profile['v_break'])
                extension = np.random.normal(*profile['extension'])

                # Derived features
                spin_axis = np.random.uniform(0, 360)  # degrees

                data.append({
                    'release_speed': velocity,
                    'release_spin_rate': spin_rate,
                    'pfx_x': h_break,
                    'pfx_z': v_break,
                    'release_extension': extension,
                    'release_pos_x': np.random.normal(0, 0.5),
                    'release_pos_z': np.random.normal(6, 0.3),
                    'spin_axis': spin_axis,
                    'pitch_type': pitch_type
                })

        return pd.DataFrame(data)

    def create_advanced_features(self, df):
        """Engineer advanced features for better classification"""
        # Velocity-adjusted spin
        df['spin_per_velocity'] = df['release_spin_rate'] / df['release_speed']

        # Total break
        df['total_break'] = np.sqrt(df['pfx_x']**2 + df['pfx_z']**2)

        # Break angle
        df['break_angle'] = np.arctan2(df['pfx_z'], df['pfx_x']) * 180 / np.pi

        # Velocity bands
        df['velo_diff_from_90'] = df['release_speed'] - 90

        # Interaction terms
        df['spin_break_interaction'] = df['release_spin_rate'] * df['total_break']

        return df

    def build_classifier(self, input_dim):
        """Build deep neural network classifier"""
        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),

            layers.Dense(256, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.4),

            layers.Dense(128, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(64, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(32, activation='relu'),
            layers.Dropout(0.2),

            layers.Dense(len(self.pitch_types), activation='softmax')
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )

        return model

    def train(self, epochs=50):
        """Train the pitch type classifier"""
        print("Generating pitch data...")
        df = self.generate_pitch_data(n_samples=15000)

        print("Engineering features...")
        df = self.create_advanced_features(df)

        # Encode target
        df['pitch_type_encoded'] = df['pitch_type'].map(self.encoder)

        # Select features
        feature_cols = [
            'release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
            'release_extension', 'release_pos_x', 'release_pos_z',
            'spin_axis', 'spin_per_velocity', 'total_break',
            'break_angle', 'velo_diff_from_90', 'spin_break_interaction'
        ]

        X = df[feature_cols].values
        y = df['pitch_type_encoded'].values

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # Scale features
        X_train = self.scaler.fit_transform(X_train)
        X_test = self.scaler.transform(X_test)

        # Build model
        self.model = self.build_classifier(input_dim=len(feature_cols))

        # Callbacks
        early_stopping = keras.callbacks.EarlyStopping(
            monitor='val_accuracy',
            patience=15,
            restore_best_weights=True
        )

        reduce_lr = keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=0.00001
        )

        # Train
        print("\nTraining classifier...")
        history = self.model.fit(
            X_train, y_train,
            validation_split=0.2,
            epochs=epochs,
            batch_size=32,
            callbacks=[early_stopping, reduce_lr],
            verbose=1
        )

        # Evaluate
        test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
        print(f"\nTest Accuracy: {test_acc:.4f}")

        # Confusion matrix
        from sklearn.metrics import classification_report, confusion_matrix

        y_pred = np.argmax(self.model.predict(X_test, verbose=0), axis=1)

        print("\nClassification Report:")
        print(classification_report(y_test, y_pred,
                                   target_names=self.pitch_types))

        return history

    def predict_pitch_type(self, pitch_features):
        """Predict pitch type with probabilities"""
        # Engineer same features as training
        features = {
            'release_speed': pitch_features[0],
            'release_spin_rate': pitch_features[1],
            'pfx_x': pitch_features[2],
            'pfx_z': pitch_features[3],
            'release_extension': pitch_features[4],
            'release_pos_x': pitch_features[5],
            'release_pos_z': pitch_features[6],
            'spin_axis': pitch_features[7]
        }

        # Derived features
        features['spin_per_velocity'] = features['release_spin_rate'] / features['release_speed']
        features['total_break'] = np.sqrt(features['pfx_x']**2 + features['pfx_z']**2)
        features['break_angle'] = np.arctan2(features['pfx_z'], features['pfx_x']) * 180 / np.pi
        features['velo_diff_from_90'] = features['release_speed'] - 90
        features['spin_break_interaction'] = features['release_spin_rate'] * features['total_break']

        # Create feature vector
        X = np.array([[
            features['release_speed'], features['release_spin_rate'],
            features['pfx_x'], features['pfx_z'], features['release_extension'],
            features['release_pos_x'], features['release_pos_z'],
            features['spin_axis'], features['spin_per_velocity'],
            features['total_break'], features['break_angle'],
            features['velo_diff_from_90'], features['spin_break_interaction']
        ]])

        X_scaled = self.scaler.transform(X)
        probabilities = self.model.predict(X_scaled, verbose=0)[0]

        predictions = {pitch: prob for pitch, prob in
                      zip(self.pitch_types, probabilities)}

        predicted_type = max(predictions, key=predictions.get)

        return predicted_type, predictions

# Train pitch type classifier
print("\n" + "="*60)
print("Training Pitch Type Classifier")
print("="*60)
pitch_classifier = PitchTypeClassifier()
pitch_history = pitch_classifier.train(epochs=40)

# Example predictions
print("\n" + "="*60)
print("Example Pitch Predictions")
print("="*60)

# Fastball
fastball_features = [94, 2350, 0, 16, 6.2, 0, 6.0, 180]
pitch_type, probs = pitch_classifier.predict_pitch_type(fastball_features)
print(f"\nFastball-like pitch (94 mph, 2350 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
    print(f"  {pitch}: {prob:.3f}")

# Curveball
curveball_features = [77, 2750, 8, -9, 5.9, 0, 6.0, 90]
pitch_type, probs = pitch_classifier.predict_pitch_type(curveball_features)
print(f"\nCurveball-like pitch (77 mph, 2750 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
    print(f"  {pitch}: {prob:.3f}")

28.7 Practical Deep Learning Pipelines

Complete Pipeline: From Data to Deployment

A production-ready deep learning pipeline for baseball analytics.

import joblib
from pathlib import Path
import json

class BaseballMLPipeline:
    """
    Production-ready ML pipeline for baseball predictions
    Includes data validation, preprocessing, training, and inference
    """

    def __init__(self, model_dir='models'):
        self.model_dir = Path(model_dir)
        self.model_dir.mkdir(exist_ok=True)

        self.model = None
        self.preprocessor = None
        self.config = {}

    def validate_data(self, df, required_columns):
        """Validate input data"""
        missing_cols = set(required_columns) - set(df.columns)
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")

        # Check for nulls
        null_counts = df[required_columns].isnull().sum()
        if null_counts.any():
            print("Warning: Found null values:")
            print(null_counts[null_counts > 0])

        return True

    def preprocess_data(self, df, fit=False):
        """Preprocess data with validation"""
        if fit:
            self.preprocessor = StandardScaler()
            X_scaled = self.preprocessor.fit_transform(df)
        else:
            if self.preprocessor is None:
                raise ValueError("Preprocessor not fitted. Call with fit=True first.")
            X_scaled = self.preprocessor.transform(df)

        return X_scaled

    def build_model(self, input_dim, output_dim, task='classification'):
        """Build model based on task type"""
        if task == 'classification':
            activation = 'softmax'
            loss = 'sparse_categorical_crossentropy'
            metrics = ['accuracy']
        else:  # regression
            activation = 'linear'
            loss = 'mse'
            metrics = ['mae']

        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),
            layers.Dense(256, activation='relu',
                        kernel_regularizer=keras.regularizers.l2(0.001)),
            layers.BatchNormalization(),
            layers.Dropout(0.4),

            layers.Dense(128, activation='relu',
                        kernel_regularizer=keras.regularizers.l2(0.001)),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(64, activation='relu'),
            layers.Dropout(0.3),

            layers.Dense(output_dim, activation=activation)
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss=loss,
            metrics=metrics
        )

        return model

    def train_with_cross_validation(self, X, y, n_splits=5):
        """Train with k-fold cross-validation"""
        from sklearn.model_selection import KFold

        kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)

        cv_scores = []

        for fold, (train_idx, val_idx) in enumerate(kfold.split(X)):
            print(f"\nFold {fold + 1}/{n_splits}")

            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]

            model = self.build_model(
                input_dim=X.shape[1],
                output_dim=1,
                task='regression'
            )

            history = model.fit(
                X_train, y_train,
                validation_data=(X_val, y_val),
                epochs=30,
                batch_size=32,
                verbose=0
            )

            val_loss = min(history.history['val_loss'])
            cv_scores.append(val_loss)
            print(f"Validation Loss: {val_loss:.4f}")

        print(f"\nCross-Validation Results:")
        print(f"Mean Loss: {np.mean(cv_scores):.4f}")
        print(f"Std Loss: {np.std(cv_scores):.4f}")

        return cv_scores

    def save_model(self, model_name):
        """Save model and preprocessor"""
        model_path = self.model_dir / f"{model_name}.h5"
        preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
        config_path = self.model_dir / f"{model_name}_config.json"

        self.model.save(model_path)
        joblib.dump(self.preprocessor, preprocessor_path)

        with open(config_path, 'w') as f:
            json.dump(self.config, f, indent=2)

        print(f"\nModel saved to {model_path}")
        print(f"Preprocessor saved to {preprocessor_path}")
        print(f"Config saved to {config_path}")

    def load_model(self, model_name):
        """Load model and preprocessor"""
        model_path = self.model_dir / f"{model_name}.h5"
        preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
        config_path = self.model_dir / f"{model_name}_config.json"

        self.model = keras.models.load_model(model_path)
        self.preprocessor = joblib.load(preprocessor_path)

        with open(config_path, 'r') as f:
            self.config = json.load(f)

        print(f"Model loaded from {model_path}")

    def predict_with_uncertainty(self, X, n_iterations=10):
        """
        Predict with uncertainty estimation using Monte Carlo Dropout
        """
        # Enable dropout at inference time
        predictions = []

        for _ in range(n_iterations):
            # Predict with dropout active
            pred = self.model(X, training=True)
            predictions.append(pred.numpy())

        predictions = np.array(predictions)

        # Calculate mean and std
        mean_prediction = predictions.mean(axis=0)
        std_prediction = predictions.std(axis=0)

        return mean_prediction, std_prediction

# Example usage
print("\n" + "="*60)
print("Baseball ML Pipeline Example")
print("="*60)

pipeline = BaseballMLPipeline(model_dir='baseball_models')

# Create sample data
sample_data = create_batter_features()
feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
                'barrel_rate', 'hard_hit_rate', 'k_rate',
                'bb_rate', 'pull_rate', 'groundball_rate']

X = sample_data[feature_cols].values
y = sample_data['wOBA'].values.reshape(-1, 1)

# Validate
pipeline.validate_data(sample_data, feature_cols)

# Preprocess
X_processed = pipeline.preprocess_data(X, fit=True)

# Cross-validation
cv_scores = pipeline.train_with_cross_validation(X_processed, y, n_splits=3)

Hyperparameter Tuning with Keras Tuner

try:
    import keras_tuner as kt

    def build_tunable_model(hp):
        """Build model with hyperparameter tuning"""
        model = keras.Sequential()
        model.add(layers.Input(shape=(9,)))

        # Tune number of layers
        for i in range(hp.Int('num_layers', 2, 5)):
            model.add(layers.Dense(
                units=hp.Int(f'units_{i}', min_value=32, max_value=256, step=32),
                activation='relu'
            ))
            model.add(layers.Dropout(
                hp.Float(f'dropout_{i}', min_value=0.0, max_value=0.5, step=0.1)
            ))

        model.add(layers.Dense(1))

        # Tune learning rate
        learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
            loss='mse',
            metrics=['mae']
        )

        return model

    print("\nHyperparameter tuning example created")
    print("To run tuning, use:")
    print("tuner = kt.RandomSearch(build_tunable_model, ...)")

except ImportError:
    print("\nKeras Tuner not installed. Install with: pip install keras-tuner")

Model Monitoring and Performance Tracking

class ModelMonitor:
    """Monitor model performance over time"""

    def __init__(self):
        self.performance_history = []

    def log_prediction(self, y_true, y_pred, timestamp=None):
        """Log prediction for monitoring"""
        if timestamp is None:
            timestamp = pd.Timestamp.now()

        error = abs(y_true - y_pred)

        self.performance_history.append({
            'timestamp': timestamp,
            'y_true': y_true,
            'y_pred': y_pred,
            'error': error
        })

    def get_metrics(self, window='7D'):
        """Calculate metrics over time window"""
        df = pd.DataFrame(self.performance_history)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df = df.set_index('timestamp')

        # Calculate rolling metrics
        metrics = {
            'mae': df['error'].resample(window).mean(),
            'mse': (df['error'] ** 2).resample(window).mean(),
            'count': df['error'].resample(window).count()
        }

        return metrics

    def detect_drift(self, threshold=0.1):
        """Detect if model performance is degrading"""
        if len(self.performance_history) < 100:
            return False

        recent_errors = [p['error'] for p in self.performance_history[-50:]]
        baseline_errors = [p['error'] for p in self.performance_history[:50]]

        recent_mae = np.mean(recent_errors)
        baseline_mae = np.mean(baseline_errors)

        drift = (recent_mae - baseline_mae) / baseline_mae

        return drift > threshold

print("\nModel monitoring system created")
Python
import joblib
from pathlib import Path
import json

class BaseballMLPipeline:
    """
    Production-ready ML pipeline for baseball predictions
    Includes data validation, preprocessing, training, and inference
    """

    def __init__(self, model_dir='models'):
        self.model_dir = Path(model_dir)
        self.model_dir.mkdir(exist_ok=True)

        self.model = None
        self.preprocessor = None
        self.config = {}

    def validate_data(self, df, required_columns):
        """Validate input data"""
        missing_cols = set(required_columns) - set(df.columns)
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")

        # Check for nulls
        null_counts = df[required_columns].isnull().sum()
        if null_counts.any():
            print("Warning: Found null values:")
            print(null_counts[null_counts > 0])

        return True

    def preprocess_data(self, df, fit=False):
        """Preprocess data with validation"""
        if fit:
            self.preprocessor = StandardScaler()
            X_scaled = self.preprocessor.fit_transform(df)
        else:
            if self.preprocessor is None:
                raise ValueError("Preprocessor not fitted. Call with fit=True first.")
            X_scaled = self.preprocessor.transform(df)

        return X_scaled

    def build_model(self, input_dim, output_dim, task='classification'):
        """Build model based on task type"""
        if task == 'classification':
            activation = 'softmax'
            loss = 'sparse_categorical_crossentropy'
            metrics = ['accuracy']
        else:  # regression
            activation = 'linear'
            loss = 'mse'
            metrics = ['mae']

        model = keras.Sequential([
            layers.Input(shape=(input_dim,)),
            layers.Dense(256, activation='relu',
                        kernel_regularizer=keras.regularizers.l2(0.001)),
            layers.BatchNormalization(),
            layers.Dropout(0.4),

            layers.Dense(128, activation='relu',
                        kernel_regularizer=keras.regularizers.l2(0.001)),
            layers.BatchNormalization(),
            layers.Dropout(0.3),

            layers.Dense(64, activation='relu'),
            layers.Dropout(0.3),

            layers.Dense(output_dim, activation=activation)
        ])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=0.001),
            loss=loss,
            metrics=metrics
        )

        return model

    def train_with_cross_validation(self, X, y, n_splits=5):
        """Train with k-fold cross-validation"""
        from sklearn.model_selection import KFold

        kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)

        cv_scores = []

        for fold, (train_idx, val_idx) in enumerate(kfold.split(X)):
            print(f"\nFold {fold + 1}/{n_splits}")

            X_train, X_val = X[train_idx], X[val_idx]
            y_train, y_val = y[train_idx], y[val_idx]

            model = self.build_model(
                input_dim=X.shape[1],
                output_dim=1,
                task='regression'
            )

            history = model.fit(
                X_train, y_train,
                validation_data=(X_val, y_val),
                epochs=30,
                batch_size=32,
                verbose=0
            )

            val_loss = min(history.history['val_loss'])
            cv_scores.append(val_loss)
            print(f"Validation Loss: {val_loss:.4f}")

        print(f"\nCross-Validation Results:")
        print(f"Mean Loss: {np.mean(cv_scores):.4f}")
        print(f"Std Loss: {np.std(cv_scores):.4f}")

        return cv_scores

    def save_model(self, model_name):
        """Save model and preprocessor"""
        model_path = self.model_dir / f"{model_name}.h5"
        preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
        config_path = self.model_dir / f"{model_name}_config.json"

        self.model.save(model_path)
        joblib.dump(self.preprocessor, preprocessor_path)

        with open(config_path, 'w') as f:
            json.dump(self.config, f, indent=2)

        print(f"\nModel saved to {model_path}")
        print(f"Preprocessor saved to {preprocessor_path}")
        print(f"Config saved to {config_path}")

    def load_model(self, model_name):
        """Load model and preprocessor"""
        model_path = self.model_dir / f"{model_name}.h5"
        preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
        config_path = self.model_dir / f"{model_name}_config.json"

        self.model = keras.models.load_model(model_path)
        self.preprocessor = joblib.load(preprocessor_path)

        with open(config_path, 'r') as f:
            self.config = json.load(f)

        print(f"Model loaded from {model_path}")

    def predict_with_uncertainty(self, X, n_iterations=10):
        """
        Predict with uncertainty estimation using Monte Carlo Dropout
        """
        # Enable dropout at inference time
        predictions = []

        for _ in range(n_iterations):
            # Predict with dropout active
            pred = self.model(X, training=True)
            predictions.append(pred.numpy())

        predictions = np.array(predictions)

        # Calculate mean and std
        mean_prediction = predictions.mean(axis=0)
        std_prediction = predictions.std(axis=0)

        return mean_prediction, std_prediction

# Example usage
print("\n" + "="*60)
print("Baseball ML Pipeline Example")
print("="*60)

pipeline = BaseballMLPipeline(model_dir='baseball_models')

# Create sample data
sample_data = create_batter_features()
feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
                'barrel_rate', 'hard_hit_rate', 'k_rate',
                'bb_rate', 'pull_rate', 'groundball_rate']

X = sample_data[feature_cols].values
y = sample_data['wOBA'].values.reshape(-1, 1)

# Validate
pipeline.validate_data(sample_data, feature_cols)

# Preprocess
X_processed = pipeline.preprocess_data(X, fit=True)

# Cross-validation
cv_scores = pipeline.train_with_cross_validation(X_processed, y, n_splits=3)
Python
try:
    import keras_tuner as kt

    def build_tunable_model(hp):
        """Build model with hyperparameter tuning"""
        model = keras.Sequential()
        model.add(layers.Input(shape=(9,)))

        # Tune number of layers
        for i in range(hp.Int('num_layers', 2, 5)):
            model.add(layers.Dense(
                units=hp.Int(f'units_{i}', min_value=32, max_value=256, step=32),
                activation='relu'
            ))
            model.add(layers.Dropout(
                hp.Float(f'dropout_{i}', min_value=0.0, max_value=0.5, step=0.1)
            ))

        model.add(layers.Dense(1))

        # Tune learning rate
        learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])

        model.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
            loss='mse',
            metrics=['mae']
        )

        return model

    print("\nHyperparameter tuning example created")
    print("To run tuning, use:")
    print("tuner = kt.RandomSearch(build_tunable_model, ...)")

except ImportError:
    print("\nKeras Tuner not installed. Install with: pip install keras-tuner")
Python
class ModelMonitor:
    """Monitor model performance over time"""

    def __init__(self):
        self.performance_history = []

    def log_prediction(self, y_true, y_pred, timestamp=None):
        """Log prediction for monitoring"""
        if timestamp is None:
            timestamp = pd.Timestamp.now()

        error = abs(y_true - y_pred)

        self.performance_history.append({
            'timestamp': timestamp,
            'y_true': y_true,
            'y_pred': y_pred,
            'error': error
        })

    def get_metrics(self, window='7D'):
        """Calculate metrics over time window"""
        df = pd.DataFrame(self.performance_history)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df = df.set_index('timestamp')

        # Calculate rolling metrics
        metrics = {
            'mae': df['error'].resample(window).mean(),
            'mse': (df['error'] ** 2).resample(window).mean(),
            'count': df['error'].resample(window).count()
        }

        return metrics

    def detect_drift(self, threshold=0.1):
        """Detect if model performance is degrading"""
        if len(self.performance_history) < 100:
            return False

        recent_errors = [p['error'] for p in self.performance_history[-50:]]
        baseline_errors = [p['error'] for p in self.performance_history[:50]]

        recent_mae = np.mean(recent_errors)
        baseline_mae = np.mean(baseline_errors)

        drift = (recent_mae - baseline_mae) / baseline_mae

        return drift > threshold

print("\nModel monitoring system created")

28.8 Exercises

Easy Exercises

Exercise 28.1: Basic Neural Network (Easy)

Build a simple neural network to predict whether a pitch will be a strike or ball based on plate location (plate_x, plate_z).

# Your solution here
# Hint: Use a binary classification model with sigmoid activation

Exercise 28.2: Feature Engineering (Easy)

Create 5 new features from pitch tracking data that might improve pitch outcome prediction. Explain why each feature might be useful.

Exercise 28.3: Model Evaluation (Easy)

Given a trained model, calculate and interpret the following metrics:


  • Accuracy

  • Precision

  • Recall

  • F1-score

Medium Exercises

Exercise 28.4: Multi-Task Learning (Medium)

Build a neural network that simultaneously predicts:


  1. Whether a pitch will be a strike or ball

  2. Whether the batter will swing

Share weights between tasks and compare performance to two separate models.

# Your solution here
# Hint: Use multiple output heads with shared hidden layers

Exercise 28.5: Sequence Prediction (Medium)

Implement an LSTM that predicts the next pitch type in a sequence based on the previous 5 pitches and the game context (count, score, inning).

Exercise 28.6: Transfer Learning (Medium)

Use a pre-trained image model (ResNet, EfficientNet) to classify different batting stances from images. Fine-tune only the last few layers.

Hard Exercises

Exercise 28.7: Attention Mechanism (Hard)

Implement a Transformer-based model for pitch sequence prediction. The model should use attention to weigh the importance of different pitches in the sequence.

# Your solution here
# Hint: Use multi-head attention layers
class PitchTransformer(nn.Module):
    def __init__(self, d_model=64, nhead=4, num_layers=2):
        super().__init__()
        # Your implementation here
        pass

Exercise 28.8: GANs for Data Augmentation (Hard)

Build a Generative Adversarial Network (GAN) to generate synthetic pitch tracking data. Use this to augment a small dataset and show improved model performance.

Exercise 28.9: Reinforcement Learning for Strategy (Hard)

Implement a Deep Q-Network (DQN) that learns optimal pitch selection strategy. The agent should choose which pitch type to throw based on:


  • Current count

  • Batter statistics

  • Previous pitches in the at-bat

Exercise 28.10: Explainable AI (Hard)

Implement SHAP (SHapley Additive exPlanations) or LIME (Local Interpretable Model-agnostic Explanations) to explain individual predictions from your pitch outcome model. Create visualizations showing which features were most important for specific predictions.

# Your solution here
# Hint: Use the shap library
import shap

# explainer = shap.DeepExplainer(model, background_data)
# shap_values = explainer.shap_values(X_test)

Project Exercise

Exercise 28.11: Complete Baseball Analytics System (Hard)

Build an end-to-end system that:

  1. Collects pitch-by-pitch data (you can use synthetic data)
  2. Trains multiple models:
  • Pitch type classifier
  • Pitch outcome predictor
  • Batter performance forecaster
  1. Implements a REST API for predictions
  2. Creates a dashboard for monitoring model performance
  3. Includes automated retraining when performance degrades

Bonus challenges:


  • Implement A/B testing for model versions

  • Add model versioning and rollback capabilities

  • Create confidence intervals for predictions

  • Implement real-time inference with sub-100ms latency


Summary

In this chapter, we explored deep learning applications in baseball analytics:

  1. Neural Network Fundamentals: Learned how to apply feedforward networks to baseball statistics
  2. Pitch Outcome Prediction: Built multi-class classifiers for pitch results using deep networks
  3. LSTM for Time Series: Forecasted player performance using recurrent neural networks
  4. Computer Vision: Applied CNNs to swing analysis and pitch recognition
  5. Trajectory Modeling: Used physics-informed neural networks for ball flight prediction
  6. Pitch Classification: Created a complete pipeline for pitch type identification
  7. Production Pipelines: Implemented end-to-end systems with monitoring and deployment

Key Takeaways:


  • Deep learning excels at finding complex patterns in high-dimensional baseball data

  • LSTMs are powerful for sequential data like pitch sequences and player performance

  • Transfer learning can accelerate computer vision applications

  • Production systems require careful attention to data validation, monitoring, and retraining

  • Ensemble methods and uncertainty quantification improve prediction reliability

Further Reading:


  • "Deep Learning" by Goodfellow, Bengio, and Courville

  • "Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow" by Aurélien Géron

  • MLB Statcast data documentation

  • PyTorch and TensorFlow official tutorials

  • Papers on sports analytics from MIT Sloan Sports Analytics Conference

Tools and Libraries:


  • TensorFlow/Keras: High-level deep learning framework

  • PyTorch: Flexible deep learning framework for research

  • Keras Tuner: Hyperparameter optimization

  • SHAP: Model explainability

  • MLflow: Experiment tracking and model management

  • TensorBoard: Training visualization

Continue to Chapter 29 to explore ensemble methods and model stacking in baseball analytics.

Python
# Your solution here
# Hint: Use a binary classification model with sigmoid activation
Python
# Your solution here
# Hint: Use multiple output heads with shared hidden layers
Python
# Your solution here
# Hint: Use multi-head attention layers
class PitchTransformer(nn.Module):
    def __init__(self, d_model=64, nhead=4, num_layers=2):
        super().__init__()
        # Your implementation here
        pass
Python
# Your solution here
# Hint: Use the shap library
import shap

# explainer = shap.DeepExplainer(model, background_data)
# shap_values = explainer.shap_values(X_test)

Chapter Summary

In this chapter, you learned about deep learning for baseball. Key topics covered:

  • Introduction to Neural Networks for Baseball
  • Pitch Outcome Prediction with Deep Learning
  • Player Performance Forecasting with LSTMs
  • Computer Vision for Baseball
  • Trajectory Modeling with Neural Networks
  • Building a Pitch Type Classifier