Understanding Deep Learning in Baseball Context
Neural networks excel at finding complex, non-linear patterns in high-dimensional data—exactly what baseball analytics demands. Traditional statistical models often assume linear relationships or require manual feature engineering, but neural networks can automatically learn relevant features from raw data.
Key Applications in Baseball:
- Pitch outcome prediction
- Player performance forecasting
- Trajectory modeling
- Computer vision for swing and pitch analysis
- Injury risk assessment
- Game state evaluation
Neural Network Fundamentals
A neural network consists of layers of interconnected nodes (neurons) that transform input data through learned weights and biases. For baseball applications, we typically use:
- Feedforward Networks: For tabular baseball statistics
- Recurrent Networks (RNNs/LSTMs): For sequential data like pitch sequences
- Convolutional Networks (CNNs): For image/video analysis
- Transformer Networks: For attention-based modeling of complex interactions
Basic Neural Network Architecture
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
# Simple feedforward network for baseball statistics
class BaseballNN(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size):
super(BaseballNN, self).__init__()
self.layers = nn.ModuleList()
# Input layer
self.layers.append(nn.Linear(input_size, hidden_sizes[0]))
# Hidden layers
for i in range(len(hidden_sizes) - 1):
self.layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
# Output layer
self.layers.append(nn.Linear(hidden_sizes[-1], output_size))
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
def forward(self, x):
for i, layer in enumerate(self.layers[:-1]):
x = self.relu(layer(x))
x = self.dropout(x)
x = self.layers[-1](x)
return x
# Example: Predicting batter performance
def create_batter_features():
"""Create synthetic batter statistics for demonstration"""
np.random.seed(42)
n_samples = 1000
data = {
'exit_velocity': np.random.normal(88, 8, n_samples),
'launch_angle': np.random.normal(15, 10, n_samples),
'sprint_speed': np.random.normal(27, 2, n_samples),
'barrel_rate': np.random.uniform(0, 20, n_samples),
'hard_hit_rate': np.random.uniform(30, 50, n_samples),
'k_rate': np.random.uniform(15, 35, n_samples),
'bb_rate': np.random.uniform(5, 15, n_samples),
'pull_rate': np.random.uniform(35, 50, n_samples),
'groundball_rate': np.random.uniform(35, 55, n_samples),
}
df = pd.DataFrame(data)
# Target: wOBA (weighted on-base average)
# Simplified calculation based on features
df['wOBA'] = (0.003 * df['exit_velocity'] +
0.002 * df['launch_angle'] +
0.005 * df['barrel_rate'] +
0.003 * df['hard_hit_rate'] -
0.002 * df['k_rate'] +
0.004 * df['bb_rate'] +
np.random.normal(0, 0.02, n_samples))
df['wOBA'] = df['wOBA'].clip(0.250, 0.450)
return df
# Train a simple model
def train_batter_model():
df = create_batter_features()
# Prepare data
feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
'barrel_rate', 'hard_hit_rate', 'k_rate',
'bb_rate', 'pull_rate', 'groundball_rate']
X = df[feature_cols].values
y = df['wOBA'].values
# Split and scale
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test).reshape(-1, 1)
# Create model
model = BaseballNN(input_size=9, hidden_sizes=[64, 32, 16], output_size=1)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
epochs = 100
losses = []
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
# Forward pass
predictions = model(X_train_tensor)
loss = criterion(predictions, y_train_tensor)
# Backward pass
loss.backward()
optimizer.step()
losses.append(loss.item())
if (epoch + 1) % 20 == 0:
print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")
# Evaluation
model.eval()
with torch.no_grad():
test_predictions = model(X_test_tensor)
test_loss = criterion(test_predictions, y_test_tensor)
print(f"\nTest Loss (MSE): {test_loss.item():.4f}")
return model, scaler, losses
print("Training basic neural network for batter performance prediction...")
model, scaler, losses = train_batter_model()
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
# Simple feedforward network for baseball statistics
class BaseballNN(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size):
super(BaseballNN, self).__init__()
self.layers = nn.ModuleList()
# Input layer
self.layers.append(nn.Linear(input_size, hidden_sizes[0]))
# Hidden layers
for i in range(len(hidden_sizes) - 1):
self.layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
# Output layer
self.layers.append(nn.Linear(hidden_sizes[-1], output_size))
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
def forward(self, x):
for i, layer in enumerate(self.layers[:-1]):
x = self.relu(layer(x))
x = self.dropout(x)
x = self.layers[-1](x)
return x
# Example: Predicting batter performance
def create_batter_features():
"""Create synthetic batter statistics for demonstration"""
np.random.seed(42)
n_samples = 1000
data = {
'exit_velocity': np.random.normal(88, 8, n_samples),
'launch_angle': np.random.normal(15, 10, n_samples),
'sprint_speed': np.random.normal(27, 2, n_samples),
'barrel_rate': np.random.uniform(0, 20, n_samples),
'hard_hit_rate': np.random.uniform(30, 50, n_samples),
'k_rate': np.random.uniform(15, 35, n_samples),
'bb_rate': np.random.uniform(5, 15, n_samples),
'pull_rate': np.random.uniform(35, 50, n_samples),
'groundball_rate': np.random.uniform(35, 55, n_samples),
}
df = pd.DataFrame(data)
# Target: wOBA (weighted on-base average)
# Simplified calculation based on features
df['wOBA'] = (0.003 * df['exit_velocity'] +
0.002 * df['launch_angle'] +
0.005 * df['barrel_rate'] +
0.003 * df['hard_hit_rate'] -
0.002 * df['k_rate'] +
0.004 * df['bb_rate'] +
np.random.normal(0, 0.02, n_samples))
df['wOBA'] = df['wOBA'].clip(0.250, 0.450)
return df
# Train a simple model
def train_batter_model():
df = create_batter_features()
# Prepare data
feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
'barrel_rate', 'hard_hit_rate', 'k_rate',
'bb_rate', 'pull_rate', 'groundball_rate']
X = df[feature_cols].values
y = df['wOBA'].values
# Split and scale
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test).reshape(-1, 1)
# Create model
model = BaseballNN(input_size=9, hidden_sizes=[64, 32, 16], output_size=1)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Training loop
epochs = 100
losses = []
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
# Forward pass
predictions = model(X_train_tensor)
loss = criterion(predictions, y_train_tensor)
# Backward pass
loss.backward()
optimizer.step()
losses.append(loss.item())
if (epoch + 1) % 20 == 0:
print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")
# Evaluation
model.eval()
with torch.no_grad():
test_predictions = model(X_test_tensor)
test_loss = criterion(test_predictions, y_test_tensor)
print(f"\nTest Loss (MSE): {test_loss.item():.4f}")
return model, scaler, losses
print("Training basic neural network for batter performance prediction...")
model, scaler, losses = train_batter_model()
Multi-Class Classification for Pitch Results
Predicting pitch outcomes (ball, called strike, swinging strike, foul, hit into play) is a fundamental deep learning task in baseball. We'll build a model that considers pitch characteristics, game context, and batter-pitcher matchup data.
# Pitch outcome prediction with TensorFlow/Keras
class PitchOutcomePredictor:
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.outcome_encoder = {
'ball': 0,
'called_strike': 1,
'swinging_strike': 2,
'foul': 3,
'in_play': 4
}
def create_pitch_data(self, n_samples=5000):
"""Generate synthetic pitch data"""
np.random.seed(42)
data = {
# Pitch characteristics
'release_speed': np.random.normal(92, 5, n_samples),
'release_spin_rate': np.random.normal(2300, 300, n_samples),
'pfx_x': np.random.normal(0, 8, n_samples), # horizontal movement
'pfx_z': np.random.normal(0, 10, n_samples), # vertical movement
'plate_x': np.random.uniform(-2, 2, n_samples), # horizontal location
'plate_z': np.random.uniform(1, 4, n_samples), # vertical location
'release_extension': np.random.normal(6.2, 0.3, n_samples),
# Count
'balls': np.random.randint(0, 4, n_samples),
'strikes': np.random.randint(0, 3, n_samples),
# Matchup
'pitcher_handedness': np.random.choice([0, 1], n_samples), # 0=L, 1=R
'batter_handedness': np.random.choice([0, 1], n_samples),
'pitch_number': np.random.randint(1, 12, n_samples),
# Pitch type (encoded)
'pitch_type': np.random.choice([0, 1, 2, 3, 4], n_samples), # FF, SL, CH, CU, SI
}
df = pd.DataFrame(data)
# Generate outcomes based on features (simplified logic)
outcomes = []
for idx, row in df.iterrows():
# Distance from strike zone center
distance_from_center = np.sqrt(row['plate_x']**2 + (row['plate_z'] - 2.5)**2)
# Probabilities based on location and movement
if distance_from_center > 1.2: # Outside zone
probs = [0.6, 0.15, 0.1, 0.1, 0.05] # Likely ball
elif distance_from_center < 0.3: # Middle-middle
probs = [0.05, 0.1, 0.15, 0.25, 0.45] # Likely in play
else: # Edge of zone
probs = [0.2, 0.25, 0.2, 0.25, 0.1] # Mixed
outcome = np.random.choice(list(self.outcome_encoder.values()), p=probs)
outcomes.append(outcome)
df['outcome'] = outcomes
return df
def build_model(self, input_dim, num_classes=5):
"""Build deep neural network for pitch outcome prediction"""
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.2),
layers.Dense(16, activation='relu'),
layers.Dense(num_classes, activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
def train(self, epochs=50, batch_size=32):
"""Train the pitch outcome model"""
# Generate data
df = self.create_pitch_data(n_samples=10000)
# Prepare features and target
feature_cols = ['release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
'plate_x', 'plate_z', 'release_extension', 'balls',
'strikes', 'pitcher_handedness', 'batter_handedness',
'pitch_number', 'pitch_type']
X = df[feature_cols].values
y = df['outcome'].values
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Scale features
X_train = self.scaler.fit_transform(X_train)
X_test = self.scaler.transform(X_test)
# Build model
self.model = self.build_model(input_dim=X_train.shape[1])
# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
reduce_lr = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=0.00001
)
# Train
history = self.model.fit(
X_train, y_train,
validation_split=0.2,
epochs=epochs,
batch_size=batch_size,
callbacks=[early_stopping, reduce_lr],
verbose=1
)
# Evaluate
test_loss, test_accuracy = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")
return history
def predict_outcome(self, pitch_features):
"""Predict outcome for new pitch"""
scaled_features = self.scaler.transform([pitch_features])
probabilities = self.model.predict(scaled_features, verbose=0)[0]
outcome_names = ['Ball', 'Called Strike', 'Swinging Strike', 'Foul', 'In Play']
prediction = {name: prob for name, prob in zip(outcome_names, probabilities)}
return prediction
# Train the model
print("\n" + "="*60)
print("Training Pitch Outcome Prediction Model")
print("="*60)
predictor = PitchOutcomePredictor()
history = predictor.train(epochs=30)
# Example prediction
sample_pitch = [94.5, 2400, -5, 12, 0.2, 2.8, 6.3, 2, 1, 1, 0, 5, 0]
prediction = predictor.predict_outcome(sample_pitch)
print("\nSample Pitch Prediction:")
for outcome, prob in prediction.items():
print(f"{outcome}: {prob:.3f}")
Advanced Architecture: Embedding Layers for Categorical Variables
For pitch type classification, we can use embedding layers to learn representations of categorical variables like pitcher ID, batter ID, and pitch type.
def create_pitch_classifier_with_embeddings():
"""Advanced pitch outcome model with embeddings"""
# Input layers for different feature types
# Continuous features
continuous_input = layers.Input(shape=(10,), name='continuous_features')
# Categorical features (pitcher, batter, etc.)
pitcher_input = layers.Input(shape=(1,), name='pitcher_id')
batter_input = layers.Input(shape=(1,), name='batter_id')
pitch_type_input = layers.Input(shape=(1,), name='pitch_type')
# Embedding layers
pitcher_embedding = layers.Embedding(
input_dim=500, # number of pitchers
output_dim=16,
name='pitcher_embedding'
)(pitcher_input)
pitcher_embedding = layers.Flatten()(pitcher_embedding)
batter_embedding = layers.Embedding(
input_dim=500, # number of batters
output_dim=16,
name='batter_embedding'
)(batter_input)
batter_embedding = layers.Flatten()(batter_embedding)
pitch_type_embedding = layers.Embedding(
input_dim=10, # number of pitch types
output_dim=8,
name='pitch_type_embedding'
)(pitch_type_input)
pitch_type_embedding = layers.Flatten()(pitch_type_embedding)
# Concatenate all features
concatenated = layers.Concatenate()([
continuous_input,
pitcher_embedding,
batter_embedding,
pitch_type_embedding
])
# Deep network
x = layers.Dense(128, activation='relu')(concatenated)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(64, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.Dropout(0.2)(x)
# Output layer
output = layers.Dense(5, activation='softmax', name='outcome')(x)
# Create model
model = keras.Model(
inputs=[continuous_input, pitcher_input, batter_input, pitch_type_input],
outputs=output
)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
print("\nCreated advanced pitch classifier with embeddings")
advanced_model = create_pitch_classifier_with_embeddings()
print(advanced_model.summary())
# Pitch outcome prediction with TensorFlow/Keras
class PitchOutcomePredictor:
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.outcome_encoder = {
'ball': 0,
'called_strike': 1,
'swinging_strike': 2,
'foul': 3,
'in_play': 4
}
def create_pitch_data(self, n_samples=5000):
"""Generate synthetic pitch data"""
np.random.seed(42)
data = {
# Pitch characteristics
'release_speed': np.random.normal(92, 5, n_samples),
'release_spin_rate': np.random.normal(2300, 300, n_samples),
'pfx_x': np.random.normal(0, 8, n_samples), # horizontal movement
'pfx_z': np.random.normal(0, 10, n_samples), # vertical movement
'plate_x': np.random.uniform(-2, 2, n_samples), # horizontal location
'plate_z': np.random.uniform(1, 4, n_samples), # vertical location
'release_extension': np.random.normal(6.2, 0.3, n_samples),
# Count
'balls': np.random.randint(0, 4, n_samples),
'strikes': np.random.randint(0, 3, n_samples),
# Matchup
'pitcher_handedness': np.random.choice([0, 1], n_samples), # 0=L, 1=R
'batter_handedness': np.random.choice([0, 1], n_samples),
'pitch_number': np.random.randint(1, 12, n_samples),
# Pitch type (encoded)
'pitch_type': np.random.choice([0, 1, 2, 3, 4], n_samples), # FF, SL, CH, CU, SI
}
df = pd.DataFrame(data)
# Generate outcomes based on features (simplified logic)
outcomes = []
for idx, row in df.iterrows():
# Distance from strike zone center
distance_from_center = np.sqrt(row['plate_x']**2 + (row['plate_z'] - 2.5)**2)
# Probabilities based on location and movement
if distance_from_center > 1.2: # Outside zone
probs = [0.6, 0.15, 0.1, 0.1, 0.05] # Likely ball
elif distance_from_center < 0.3: # Middle-middle
probs = [0.05, 0.1, 0.15, 0.25, 0.45] # Likely in play
else: # Edge of zone
probs = [0.2, 0.25, 0.2, 0.25, 0.1] # Mixed
outcome = np.random.choice(list(self.outcome_encoder.values()), p=probs)
outcomes.append(outcome)
df['outcome'] = outcomes
return df
def build_model(self, input_dim, num_classes=5):
"""Build deep neural network for pitch outcome prediction"""
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.2),
layers.Dense(16, activation='relu'),
layers.Dense(num_classes, activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
def train(self, epochs=50, batch_size=32):
"""Train the pitch outcome model"""
# Generate data
df = self.create_pitch_data(n_samples=10000)
# Prepare features and target
feature_cols = ['release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
'plate_x', 'plate_z', 'release_extension', 'balls',
'strikes', 'pitcher_handedness', 'batter_handedness',
'pitch_number', 'pitch_type']
X = df[feature_cols].values
y = df['outcome'].values
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Scale features
X_train = self.scaler.fit_transform(X_train)
X_test = self.scaler.transform(X_test)
# Build model
self.model = self.build_model(input_dim=X_train.shape[1])
# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
reduce_lr = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=0.00001
)
# Train
history = self.model.fit(
X_train, y_train,
validation_split=0.2,
epochs=epochs,
batch_size=batch_size,
callbacks=[early_stopping, reduce_lr],
verbose=1
)
# Evaluate
test_loss, test_accuracy = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")
return history
def predict_outcome(self, pitch_features):
"""Predict outcome for new pitch"""
scaled_features = self.scaler.transform([pitch_features])
probabilities = self.model.predict(scaled_features, verbose=0)[0]
outcome_names = ['Ball', 'Called Strike', 'Swinging Strike', 'Foul', 'In Play']
prediction = {name: prob for name, prob in zip(outcome_names, probabilities)}
return prediction
# Train the model
print("\n" + "="*60)
print("Training Pitch Outcome Prediction Model")
print("="*60)
predictor = PitchOutcomePredictor()
history = predictor.train(epochs=30)
# Example prediction
sample_pitch = [94.5, 2400, -5, 12, 0.2, 2.8, 6.3, 2, 1, 1, 0, 5, 0]
prediction = predictor.predict_outcome(sample_pitch)
print("\nSample Pitch Prediction:")
for outcome, prob in prediction.items():
print(f"{outcome}: {prob:.3f}")
def create_pitch_classifier_with_embeddings():
"""Advanced pitch outcome model with embeddings"""
# Input layers for different feature types
# Continuous features
continuous_input = layers.Input(shape=(10,), name='continuous_features')
# Categorical features (pitcher, batter, etc.)
pitcher_input = layers.Input(shape=(1,), name='pitcher_id')
batter_input = layers.Input(shape=(1,), name='batter_id')
pitch_type_input = layers.Input(shape=(1,), name='pitch_type')
# Embedding layers
pitcher_embedding = layers.Embedding(
input_dim=500, # number of pitchers
output_dim=16,
name='pitcher_embedding'
)(pitcher_input)
pitcher_embedding = layers.Flatten()(pitcher_embedding)
batter_embedding = layers.Embedding(
input_dim=500, # number of batters
output_dim=16,
name='batter_embedding'
)(batter_input)
batter_embedding = layers.Flatten()(batter_embedding)
pitch_type_embedding = layers.Embedding(
input_dim=10, # number of pitch types
output_dim=8,
name='pitch_type_embedding'
)(pitch_type_input)
pitch_type_embedding = layers.Flatten()(pitch_type_embedding)
# Concatenate all features
concatenated = layers.Concatenate()([
continuous_input,
pitcher_embedding,
batter_embedding,
pitch_type_embedding
])
# Deep network
x = layers.Dense(128, activation='relu')(concatenated)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(64, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.Dropout(0.2)(x)
# Output layer
output = layers.Dense(5, activation='softmax', name='outcome')(x)
# Create model
model = keras.Model(
inputs=[continuous_input, pitcher_input, batter_input, pitch_type_input],
outputs=output
)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
print("\nCreated advanced pitch classifier with embeddings")
advanced_model = create_pitch_classifier_with_embeddings()
print(advanced_model.summary())
Time Series Analysis with Recurrent Networks
Long Short-Term Memory (LSTM) networks excel at sequential data, making them perfect for forecasting player performance over time. We'll build a model that predicts a player's next-game performance based on recent history.
class PlayerPerformanceLSTM:
def __init__(self, sequence_length=10):
self.sequence_length = sequence_length
self.model = None
self.scaler = StandardScaler()
def create_player_timeseries(self, n_games=162, n_players=50):
"""Generate synthetic player performance time series"""
np.random.seed(42)
all_data = []
for player_id in range(n_players):
# Player skill level (affects baseline performance)
skill = np.random.uniform(0.6, 1.4)
# Seasonal trend
games = np.arange(n_games)
seasonal_trend = 0.1 * np.sin(2 * np.pi * games / n_games)
# Random walk component
random_walk = np.cumsum(np.random.normal(0, 0.05, n_games))
random_walk = random_walk - random_walk.mean()
# Performance metrics
data = {
'player_id': player_id,
'game_number': games,
'hits': np.random.poisson(skill * 1.2 + seasonal_trend + random_walk, n_games),
'at_bats': np.random.poisson(4, n_games),
'walks': np.random.poisson(skill * 0.3, n_games),
'strikeouts': np.random.poisson((2 - skill) * 0.8, n_games),
'home_runs': np.random.poisson(skill * 0.15, n_games),
}
player_df = pd.DataFrame(data)
# Calculate batting average and OPS for each game
player_df['batting_avg'] = player_df['hits'] / player_df['at_bats'].clip(lower=1)
player_df['obp'] = (player_df['hits'] + player_df['walks']) / \
(player_df['at_bats'] + player_df['walks']).clip(lower=1)
player_df['slg'] = (player_df['hits'] + player_df['home_runs'] * 3) / \
player_df['at_bats'].clip(lower=1)
player_df['ops'] = player_df['obp'] + player_df['slg']
# Rolling averages (last 10 games)
player_df['rolling_avg'] = player_df['batting_avg'].rolling(10, min_periods=1).mean()
player_df['rolling_ops'] = player_df['ops'].rolling(10, min_periods=1).mean()
all_data.append(player_df)
return pd.concat(all_data, ignore_index=True)
def prepare_sequences(self, data, target_col='ops'):
"""Create sequences for LSTM training"""
feature_cols = ['batting_avg', 'obp', 'slg', 'ops',
'rolling_avg', 'rolling_ops', 'hits',
'walks', 'strikeouts', 'home_runs']
sequences = []
targets = []
# Group by player
for player_id in data['player_id'].unique():
player_data = data[data['player_id'] == player_id][feature_cols].values
# Create sequences
for i in range(len(player_data) - self.sequence_length):
sequences.append(player_data[i:i + self.sequence_length])
targets.append(player_data[i + self.sequence_length][3]) # OPS
return np.array(sequences), np.array(targets)
def build_lstm_model(self, input_shape):
"""Build LSTM model for performance forecasting"""
model = keras.Sequential([
layers.LSTM(64, return_sequences=True, input_shape=input_shape),
layers.Dropout(0.3),
layers.LSTM(32, return_sequences=True),
layers.Dropout(0.3),
layers.LSTM(16),
layers.Dropout(0.2),
layers.Dense(8, activation='relu'),
layers.Dense(1) # Predict next game OPS
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
def train(self, epochs=50, batch_size=32):
"""Train LSTM model"""
print("Generating player time series data...")
data = self.create_player_timeseries(n_games=162, n_players=100)
print("Preparing sequences...")
X, y = self.prepare_sequences(data)
# Split data
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Scale each feature independently
n_samples, n_timesteps, n_features = X_train.shape
X_train_reshaped = X_train.reshape(-1, n_features)
X_test_reshaped = X_test.reshape(-1, n_features)
X_train_scaled = self.scaler.fit_transform(X_train_reshaped)
X_test_scaled = self.scaler.transform(X_test_reshaped)
X_train = X_train_scaled.reshape(n_samples, n_timesteps, n_features)
X_test = X_test_scaled.reshape(-1, n_timesteps, n_features)
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
# Build model
self.model = self.build_lstm_model(input_shape=(n_timesteps, n_features))
# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=15,
restore_best_weights=True
)
# Train
history = self.model.fit(
X_train, y_train,
validation_data=(X_test, y_test),
epochs=epochs,
batch_size=batch_size,
callbacks=[early_stopping],
verbose=1
)
# Evaluate
test_loss, test_mae = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest MAE: {test_mae:.4f}")
print(f"Test MSE: {test_loss:.4f}")
return history
def forecast_performance(self, recent_games):
"""Forecast next game performance"""
scaled_games = self.scaler.transform(recent_games)
scaled_games = scaled_games.reshape(1, self.sequence_length, -1)
prediction = self.model.predict(scaled_games, verbose=0)[0][0]
return prediction
# Train LSTM model
print("\n" + "="*60)
print("Training LSTM for Player Performance Forecasting")
print("="*60)
lstm_model = PlayerPerformanceLSTM(sequence_length=10)
lstm_history = lstm_model.train(epochs=30)
Bidirectional LSTM for Enhanced Context
Bidirectional LSTMs process sequences in both forward and backward directions, capturing more context.
def build_bidirectional_lstm(input_shape):
"""Bidirectional LSTM for player performance"""
model = keras.Sequential([
layers.Bidirectional(layers.LSTM(64, return_sequences=True),
input_shape=input_shape),
layers.Dropout(0.3),
layers.Bidirectional(layers.LSTM(32)),
layers.Dropout(0.3),
layers.Dense(16, activation='relu'),
layers.Dense(1)
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
print("\nBidirectional LSTM model created")
class PlayerPerformanceLSTM:
def __init__(self, sequence_length=10):
self.sequence_length = sequence_length
self.model = None
self.scaler = StandardScaler()
def create_player_timeseries(self, n_games=162, n_players=50):
"""Generate synthetic player performance time series"""
np.random.seed(42)
all_data = []
for player_id in range(n_players):
# Player skill level (affects baseline performance)
skill = np.random.uniform(0.6, 1.4)
# Seasonal trend
games = np.arange(n_games)
seasonal_trend = 0.1 * np.sin(2 * np.pi * games / n_games)
# Random walk component
random_walk = np.cumsum(np.random.normal(0, 0.05, n_games))
random_walk = random_walk - random_walk.mean()
# Performance metrics
data = {
'player_id': player_id,
'game_number': games,
'hits': np.random.poisson(skill * 1.2 + seasonal_trend + random_walk, n_games),
'at_bats': np.random.poisson(4, n_games),
'walks': np.random.poisson(skill * 0.3, n_games),
'strikeouts': np.random.poisson((2 - skill) * 0.8, n_games),
'home_runs': np.random.poisson(skill * 0.15, n_games),
}
player_df = pd.DataFrame(data)
# Calculate batting average and OPS for each game
player_df['batting_avg'] = player_df['hits'] / player_df['at_bats'].clip(lower=1)
player_df['obp'] = (player_df['hits'] + player_df['walks']) / \
(player_df['at_bats'] + player_df['walks']).clip(lower=1)
player_df['slg'] = (player_df['hits'] + player_df['home_runs'] * 3) / \
player_df['at_bats'].clip(lower=1)
player_df['ops'] = player_df['obp'] + player_df['slg']
# Rolling averages (last 10 games)
player_df['rolling_avg'] = player_df['batting_avg'].rolling(10, min_periods=1).mean()
player_df['rolling_ops'] = player_df['ops'].rolling(10, min_periods=1).mean()
all_data.append(player_df)
return pd.concat(all_data, ignore_index=True)
def prepare_sequences(self, data, target_col='ops'):
"""Create sequences for LSTM training"""
feature_cols = ['batting_avg', 'obp', 'slg', 'ops',
'rolling_avg', 'rolling_ops', 'hits',
'walks', 'strikeouts', 'home_runs']
sequences = []
targets = []
# Group by player
for player_id in data['player_id'].unique():
player_data = data[data['player_id'] == player_id][feature_cols].values
# Create sequences
for i in range(len(player_data) - self.sequence_length):
sequences.append(player_data[i:i + self.sequence_length])
targets.append(player_data[i + self.sequence_length][3]) # OPS
return np.array(sequences), np.array(targets)
def build_lstm_model(self, input_shape):
"""Build LSTM model for performance forecasting"""
model = keras.Sequential([
layers.LSTM(64, return_sequences=True, input_shape=input_shape),
layers.Dropout(0.3),
layers.LSTM(32, return_sequences=True),
layers.Dropout(0.3),
layers.LSTM(16),
layers.Dropout(0.2),
layers.Dense(8, activation='relu'),
layers.Dense(1) # Predict next game OPS
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
def train(self, epochs=50, batch_size=32):
"""Train LSTM model"""
print("Generating player time series data...")
data = self.create_player_timeseries(n_games=162, n_players=100)
print("Preparing sequences...")
X, y = self.prepare_sequences(data)
# Split data
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Scale each feature independently
n_samples, n_timesteps, n_features = X_train.shape
X_train_reshaped = X_train.reshape(-1, n_features)
X_test_reshaped = X_test.reshape(-1, n_features)
X_train_scaled = self.scaler.fit_transform(X_train_reshaped)
X_test_scaled = self.scaler.transform(X_test_reshaped)
X_train = X_train_scaled.reshape(n_samples, n_timesteps, n_features)
X_test = X_test_scaled.reshape(-1, n_timesteps, n_features)
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
# Build model
self.model = self.build_lstm_model(input_shape=(n_timesteps, n_features))
# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=15,
restore_best_weights=True
)
# Train
history = self.model.fit(
X_train, y_train,
validation_data=(X_test, y_test),
epochs=epochs,
batch_size=batch_size,
callbacks=[early_stopping],
verbose=1
)
# Evaluate
test_loss, test_mae = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest MAE: {test_mae:.4f}")
print(f"Test MSE: {test_loss:.4f}")
return history
def forecast_performance(self, recent_games):
"""Forecast next game performance"""
scaled_games = self.scaler.transform(recent_games)
scaled_games = scaled_games.reshape(1, self.sequence_length, -1)
prediction = self.model.predict(scaled_games, verbose=0)[0][0]
return prediction
# Train LSTM model
print("\n" + "="*60)
print("Training LSTM for Player Performance Forecasting")
print("="*60)
lstm_model = PlayerPerformanceLSTM(sequence_length=10)
lstm_history = lstm_model.train(epochs=30)
def build_bidirectional_lstm(input_shape):
"""Bidirectional LSTM for player performance"""
model = keras.Sequential([
layers.Bidirectional(layers.LSTM(64, return_sequences=True),
input_shape=input_shape),
layers.Dropout(0.3),
layers.Bidirectional(layers.LSTM(32)),
layers.Dropout(0.3),
layers.Dense(16, activation='relu'),
layers.Dense(1)
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
return model
print("\nBidirectional LSTM model created")
Swing Analysis with Convolutional Neural Networks
Computer vision enables automated analysis of player movements, pitch tracking, and defensive positioning. We'll explore CNNs for swing classification and pose estimation.
class SwingAnalyzer:
"""CNN-based swing analysis system"""
def __init__(self, img_height=224, img_width=224):
self.img_height = img_height
self.img_width = img_width
self.model = None
def create_synthetic_swing_data(self, n_samples=1000):
"""
Generate synthetic swing features
In practice, these would come from video frame analysis
"""
np.random.seed(42)
# Swing types: good, long, uppercut, level, ground_ball
swing_types = ['good', 'long', 'uppercut', 'level', 'ground_ball']
data = []
for swing_type in swing_types:
n = n_samples // len(swing_types)
if swing_type == 'good':
bat_speed = np.random.normal(75, 3, n)
attack_angle = np.random.normal(12, 3, n)
time_to_contact = np.random.normal(0.16, 0.01, n)
vertical_bat_angle = np.random.normal(25, 5, n)
elif swing_type == 'long':
bat_speed = np.random.normal(70, 4, n)
attack_angle = np.random.normal(10, 4, n)
time_to_contact = np.random.normal(0.19, 0.02, n)
vertical_bat_angle = np.random.normal(30, 6, n)
elif swing_type == 'uppercut':
bat_speed = np.random.normal(73, 3, n)
attack_angle = np.random.normal(18, 3, n)
time_to_contact = np.random.normal(0.17, 0.01, n)
vertical_bat_angle = np.random.normal(35, 5, n)
elif swing_type == 'level':
bat_speed = np.random.normal(74, 3, n)
attack_angle = np.random.normal(8, 2, n)
time_to_contact = np.random.normal(0.16, 0.01, n)
vertical_bat_angle = np.random.normal(20, 4, n)
else: # ground_ball
bat_speed = np.random.normal(71, 4, n)
attack_angle = np.random.normal(-5, 3, n)
time_to_contact = np.random.normal(0.18, 0.02, n)
vertical_bat_angle = np.random.normal(15, 5, n)
for i in range(n):
data.append({
'bat_speed': bat_speed[i],
'attack_angle': attack_angle[i],
'time_to_contact': time_to_contact[i],
'vertical_bat_angle': vertical_bat_angle[i],
'swing_type': swing_type
})
return pd.DataFrame(data)
def build_swing_classifier(self, input_dim=4, num_classes=5):
"""Build classifier for swing analysis"""
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.4),
layers.Dense(64, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
def train(self, epochs=40):
"""Train swing classification model"""
df = self.create_synthetic_swing_data(n_samples=2000)
# Encode swing types
swing_encoder = {
'good': 0, 'long': 1, 'uppercut': 2,
'level': 3, 'ground_ball': 4
}
df['swing_type_encoded'] = df['swing_type'].map(swing_encoder)
# Prepare data
feature_cols = ['bat_speed', 'attack_angle', 'time_to_contact',
'vertical_bat_angle']
X = df[feature_cols].values
y = df['swing_type_encoded'].values
# Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Build and train
self.model = self.build_swing_classifier()
history = self.model.fit(
X_train, y_train,
validation_split=0.2,
epochs=epochs,
batch_size=32,
verbose=1
)
# Evaluate
test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest Accuracy: {test_acc:.4f}")
return history, scaler, swing_encoder
# Train swing analyzer
print("\n" + "="*60)
print("Training Swing Analysis Model")
print("="*60)
swing_analyzer = SwingAnalyzer()
swing_history, swing_scaler, swing_encoder = swing_analyzer.train(epochs=30)
CNN Architecture for Video Frame Analysis
For actual video analysis, we would use convolutional layers to process frames:
def build_video_frame_cnn(img_height=224, img_width=224):
"""
CNN for analyzing swing video frames
Input: Video frames of baseball swings
Output: Swing classification or pose keypoints
"""
model = keras.Sequential([
# Convolutional blocks
layers.Conv2D(32, (3, 3), activation='relu',
input_shape=(img_height, img_width, 3)),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(128, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(256, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
# Flatten and dense layers
layers.Flatten(),
layers.Dense(512, activation='relu'),
layers.Dropout(0.5),
layers.Dense(256, activation='relu'),
layers.Dropout(0.3),
# Output layer (5 swing classes)
layers.Dense(5, activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.0001),
loss='categorical_crossentropy',
metrics=['accuracy']
)
return model
print("\nCNN for video frame analysis created")
video_cnn = build_video_frame_cnn()
print(f"Total parameters: {video_cnn.count_params():,}")
Transfer Learning for Pitch Recognition
Using pre-trained models like ResNet or EfficientNet for pitch type recognition from broadcast video:
def build_pitch_recognition_model():
"""
Transfer learning model for pitch type recognition
Uses pre-trained ResNet50 as feature extractor
"""
# Load pre-trained ResNet50
base_model = keras.applications.ResNet50(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# Freeze base model layers
base_model.trainable = False
# Add custom classification layers
model = keras.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.Dense(256, activation='relu'),
layers.Dropout(0.5),
layers.Dense(128, activation='relu'),
layers.Dropout(0.3),
layers.Dense(10, activation='softmax') # 10 pitch types
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.0001),
loss='categorical_crossentropy',
metrics=['accuracy']
)
return model
print("\nTransfer learning model for pitch recognition created")
class SwingAnalyzer:
"""CNN-based swing analysis system"""
def __init__(self, img_height=224, img_width=224):
self.img_height = img_height
self.img_width = img_width
self.model = None
def create_synthetic_swing_data(self, n_samples=1000):
"""
Generate synthetic swing features
In practice, these would come from video frame analysis
"""
np.random.seed(42)
# Swing types: good, long, uppercut, level, ground_ball
swing_types = ['good', 'long', 'uppercut', 'level', 'ground_ball']
data = []
for swing_type in swing_types:
n = n_samples // len(swing_types)
if swing_type == 'good':
bat_speed = np.random.normal(75, 3, n)
attack_angle = np.random.normal(12, 3, n)
time_to_contact = np.random.normal(0.16, 0.01, n)
vertical_bat_angle = np.random.normal(25, 5, n)
elif swing_type == 'long':
bat_speed = np.random.normal(70, 4, n)
attack_angle = np.random.normal(10, 4, n)
time_to_contact = np.random.normal(0.19, 0.02, n)
vertical_bat_angle = np.random.normal(30, 6, n)
elif swing_type == 'uppercut':
bat_speed = np.random.normal(73, 3, n)
attack_angle = np.random.normal(18, 3, n)
time_to_contact = np.random.normal(0.17, 0.01, n)
vertical_bat_angle = np.random.normal(35, 5, n)
elif swing_type == 'level':
bat_speed = np.random.normal(74, 3, n)
attack_angle = np.random.normal(8, 2, n)
time_to_contact = np.random.normal(0.16, 0.01, n)
vertical_bat_angle = np.random.normal(20, 4, n)
else: # ground_ball
bat_speed = np.random.normal(71, 4, n)
attack_angle = np.random.normal(-5, 3, n)
time_to_contact = np.random.normal(0.18, 0.02, n)
vertical_bat_angle = np.random.normal(15, 5, n)
for i in range(n):
data.append({
'bat_speed': bat_speed[i],
'attack_angle': attack_angle[i],
'time_to_contact': time_to_contact[i],
'vertical_bat_angle': vertical_bat_angle[i],
'swing_type': swing_type
})
return pd.DataFrame(data)
def build_swing_classifier(self, input_dim=4, num_classes=5):
"""Build classifier for swing analysis"""
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.4),
layers.Dense(64, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
def train(self, epochs=40):
"""Train swing classification model"""
df = self.create_synthetic_swing_data(n_samples=2000)
# Encode swing types
swing_encoder = {
'good': 0, 'long': 1, 'uppercut': 2,
'level': 3, 'ground_ball': 4
}
df['swing_type_encoded'] = df['swing_type'].map(swing_encoder)
# Prepare data
feature_cols = ['bat_speed', 'attack_angle', 'time_to_contact',
'vertical_bat_angle']
X = df[feature_cols].values
y = df['swing_type_encoded'].values
# Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)
# Build and train
self.model = self.build_swing_classifier()
history = self.model.fit(
X_train, y_train,
validation_split=0.2,
epochs=epochs,
batch_size=32,
verbose=1
)
# Evaluate
test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest Accuracy: {test_acc:.4f}")
return history, scaler, swing_encoder
# Train swing analyzer
print("\n" + "="*60)
print("Training Swing Analysis Model")
print("="*60)
swing_analyzer = SwingAnalyzer()
swing_history, swing_scaler, swing_encoder = swing_analyzer.train(epochs=30)
def build_video_frame_cnn(img_height=224, img_width=224):
"""
CNN for analyzing swing video frames
Input: Video frames of baseball swings
Output: Swing classification or pose keypoints
"""
model = keras.Sequential([
# Convolutional blocks
layers.Conv2D(32, (3, 3), activation='relu',
input_shape=(img_height, img_width, 3)),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(128, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(256, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
# Flatten and dense layers
layers.Flatten(),
layers.Dense(512, activation='relu'),
layers.Dropout(0.5),
layers.Dense(256, activation='relu'),
layers.Dropout(0.3),
# Output layer (5 swing classes)
layers.Dense(5, activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.0001),
loss='categorical_crossentropy',
metrics=['accuracy']
)
return model
print("\nCNN for video frame analysis created")
video_cnn = build_video_frame_cnn()
print(f"Total parameters: {video_cnn.count_params():,}")
def build_pitch_recognition_model():
"""
Transfer learning model for pitch type recognition
Uses pre-trained ResNet50 as feature extractor
"""
# Load pre-trained ResNet50
base_model = keras.applications.ResNet50(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# Freeze base model layers
base_model.trainable = False
# Add custom classification layers
model = keras.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.Dense(256, activation='relu'),
layers.Dropout(0.5),
layers.Dense(128, activation='relu'),
layers.Dropout(0.3),
layers.Dense(10, activation='softmax') # 10 pitch types
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.0001),
loss='categorical_crossentropy',
metrics=['accuracy']
)
return model
print("\nTransfer learning model for pitch recognition created")
Physics-Informed Neural Networks for Ball Flight
Traditional physics models use differential equations to model ball flight. Neural networks can learn these patterns directly from data while incorporating physical constraints.
class TrajectoryPredictor:
"""Neural network for baseball trajectory prediction"""
def __init__(self):
self.model = None
self.scaler_X = StandardScaler()
self.scaler_y = StandardScaler()
def generate_trajectory_data(self, n_samples=5000):
"""
Generate synthetic trajectory data based on physics
Simplified model of batted ball flight
"""
np.random.seed(42)
data = []
for _ in range(n_samples):
# Initial conditions
exit_velocity = np.random.uniform(70, 110) # mph
launch_angle = np.random.uniform(-10, 50) # degrees
spray_angle = np.random.uniform(-45, 45) # degrees
spin_rate = np.random.uniform(1000, 3500) # rpm
# Convert to SI units for physics calculation
v0 = exit_velocity * 0.44704 # mph to m/s
theta = np.radians(launch_angle)
phi = np.radians(spray_angle)
# Simplified trajectory calculation (ignoring Magnus force for simplicity)
g = 9.81 # gravity
# Time of flight (simplified)
if theta > 0:
t_flight = 2 * v0 * np.sin(theta) / g
else:
t_flight = 0.1
# Landing distance (simplified)
distance = (v0**2 * np.sin(2 * theta) / g) * 0.3048 # meters to feet
distance = max(0, distance + np.random.normal(0, 10)) # Add noise
# Maximum height
max_height = (v0**2 * np.sin(theta)**2) / (2 * g) * 3.281 # meters to feet
max_height = max(0, max_height + np.random.normal(0, 5))
# Hang time
hang_time = t_flight + np.random.normal(0, 0.1)
hang_time = max(0, hang_time)
# Landing location (x, y) on field
landing_x = distance * np.cos(phi)
landing_y = distance * np.sin(phi)
data.append({
'exit_velocity': exit_velocity,
'launch_angle': launch_angle,
'spray_angle': spray_angle,
'spin_rate': spin_rate,
'distance': distance,
'max_height': max_height,
'hang_time': hang_time,
'landing_x': landing_x,
'landing_y': landing_y
})
return pd.DataFrame(data)
def build_trajectory_model(self, input_dim=4):
"""
Build neural network for trajectory prediction
Multiple outputs: distance, max_height, hang_time, landing coordinates
"""
inputs = layers.Input(shape=(input_dim,))
# Shared layers
x = layers.Dense(128, activation='relu')(inputs)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(64, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.Dropout(0.2)(x)
# Multiple output heads
distance_output = layers.Dense(16, activation='relu')(x)
distance_output = layers.Dense(1, name='distance')(distance_output)
height_output = layers.Dense(16, activation='relu')(x)
height_output = layers.Dense(1, name='max_height')(height_output)
time_output = layers.Dense(16, activation='relu')(x)
time_output = layers.Dense(1, name='hang_time')(time_output)
location_output = layers.Dense(16, activation='relu')(x)
location_output = layers.Dense(2, name='landing_location')(location_output)
# Create model
model = keras.Model(
inputs=inputs,
outputs=[distance_output, height_output, time_output, location_output]
)
# Compile with different losses for each output
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss={
'distance': 'mse',
'max_height': 'mse',
'hang_time': 'mse',
'landing_location': 'mse'
},
loss_weights={
'distance': 1.0,
'max_height': 0.5,
'hang_time': 0.5,
'landing_location': 0.8
},
metrics=['mae']
)
return model
def train(self, epochs=50):
"""Train trajectory prediction model"""
print("Generating trajectory data...")
df = self.generate_trajectory_data(n_samples=10000)
# Prepare data
input_cols = ['exit_velocity', 'launch_angle', 'spray_angle', 'spin_rate']
X = df[input_cols].values
y_distance = df['distance'].values.reshape(-1, 1)
y_height = df['max_height'].values.reshape(-1, 1)
y_time = df['hang_time'].values.reshape(-1, 1)
y_location = df[['landing_x', 'landing_y']].values
# Split data
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_dist_train, y_dist_test = y_distance[:split_idx], y_distance[split_idx:]
y_height_train, y_height_test = y_height[:split_idx], y_height[split_idx:]
y_time_train, y_time_test = y_time[:split_idx], y_time[split_idx:]
y_loc_train, y_loc_test = y_location[:split_idx], y_location[split_idx:]
# Scale features
X_train = self.scaler_X.fit_transform(X_train)
X_test = self.scaler_X.transform(X_test)
# Build model
self.model = self.build_trajectory_model(input_dim=4)
# Train
history = self.model.fit(
X_train,
{
'distance': y_dist_train,
'max_height': y_height_train,
'hang_time': y_time_train,
'landing_location': y_loc_train
},
validation_data=(
X_test,
{
'distance': y_dist_test,
'max_height': y_height_test,
'hang_time': y_time_test,
'landing_location': y_loc_test
}
),
epochs=epochs,
batch_size=32,
verbose=1
)
print("\nEvaluating model...")
self.model.evaluate(
X_test,
{
'distance': y_dist_test,
'max_height': y_height_test,
'hang_time': y_time_test,
'landing_location': y_loc_test
},
verbose=0
)
return history
def predict_trajectory(self, exit_velocity, launch_angle, spray_angle, spin_rate):
"""Predict trajectory for given initial conditions"""
X = np.array([[exit_velocity, launch_angle, spray_angle, spin_rate]])
X_scaled = self.scaler_X.transform(X)
predictions = self.model.predict(X_scaled, verbose=0)
result = {
'distance': predictions[0][0][0],
'max_height': predictions[1][0][0],
'hang_time': predictions[2][0][0],
'landing_x': predictions[3][0][0],
'landing_y': predictions[3][0][1]
}
return result
# Train trajectory model
print("\n" + "="*60)
print("Training Trajectory Prediction Model")
print("="*60)
trajectory_model = TrajectoryPredictor()
traj_history = trajectory_model.train(epochs=40)
# Example prediction
print("\nExample Trajectory Prediction:")
print("Input: 105 mph exit velocity, 28° launch angle, 10° spray angle, 2200 rpm spin")
prediction = trajectory_model.predict_trajectory(105, 28, 10, 2200)
for key, value in prediction.items():
print(f"{key}: {value:.2f}")
class TrajectoryPredictor:
"""Neural network for baseball trajectory prediction"""
def __init__(self):
self.model = None
self.scaler_X = StandardScaler()
self.scaler_y = StandardScaler()
def generate_trajectory_data(self, n_samples=5000):
"""
Generate synthetic trajectory data based on physics
Simplified model of batted ball flight
"""
np.random.seed(42)
data = []
for _ in range(n_samples):
# Initial conditions
exit_velocity = np.random.uniform(70, 110) # mph
launch_angle = np.random.uniform(-10, 50) # degrees
spray_angle = np.random.uniform(-45, 45) # degrees
spin_rate = np.random.uniform(1000, 3500) # rpm
# Convert to SI units for physics calculation
v0 = exit_velocity * 0.44704 # mph to m/s
theta = np.radians(launch_angle)
phi = np.radians(spray_angle)
# Simplified trajectory calculation (ignoring Magnus force for simplicity)
g = 9.81 # gravity
# Time of flight (simplified)
if theta > 0:
t_flight = 2 * v0 * np.sin(theta) / g
else:
t_flight = 0.1
# Landing distance (simplified)
distance = (v0**2 * np.sin(2 * theta) / g) * 0.3048 # meters to feet
distance = max(0, distance + np.random.normal(0, 10)) # Add noise
# Maximum height
max_height = (v0**2 * np.sin(theta)**2) / (2 * g) * 3.281 # meters to feet
max_height = max(0, max_height + np.random.normal(0, 5))
# Hang time
hang_time = t_flight + np.random.normal(0, 0.1)
hang_time = max(0, hang_time)
# Landing location (x, y) on field
landing_x = distance * np.cos(phi)
landing_y = distance * np.sin(phi)
data.append({
'exit_velocity': exit_velocity,
'launch_angle': launch_angle,
'spray_angle': spray_angle,
'spin_rate': spin_rate,
'distance': distance,
'max_height': max_height,
'hang_time': hang_time,
'landing_x': landing_x,
'landing_y': landing_y
})
return pd.DataFrame(data)
def build_trajectory_model(self, input_dim=4):
"""
Build neural network for trajectory prediction
Multiple outputs: distance, max_height, hang_time, landing coordinates
"""
inputs = layers.Input(shape=(input_dim,))
# Shared layers
x = layers.Dense(128, activation='relu')(inputs)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(64, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(32, activation='relu')(x)
x = layers.Dropout(0.2)(x)
# Multiple output heads
distance_output = layers.Dense(16, activation='relu')(x)
distance_output = layers.Dense(1, name='distance')(distance_output)
height_output = layers.Dense(16, activation='relu')(x)
height_output = layers.Dense(1, name='max_height')(height_output)
time_output = layers.Dense(16, activation='relu')(x)
time_output = layers.Dense(1, name='hang_time')(time_output)
location_output = layers.Dense(16, activation='relu')(x)
location_output = layers.Dense(2, name='landing_location')(location_output)
# Create model
model = keras.Model(
inputs=inputs,
outputs=[distance_output, height_output, time_output, location_output]
)
# Compile with different losses for each output
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss={
'distance': 'mse',
'max_height': 'mse',
'hang_time': 'mse',
'landing_location': 'mse'
},
loss_weights={
'distance': 1.0,
'max_height': 0.5,
'hang_time': 0.5,
'landing_location': 0.8
},
metrics=['mae']
)
return model
def train(self, epochs=50):
"""Train trajectory prediction model"""
print("Generating trajectory data...")
df = self.generate_trajectory_data(n_samples=10000)
# Prepare data
input_cols = ['exit_velocity', 'launch_angle', 'spray_angle', 'spin_rate']
X = df[input_cols].values
y_distance = df['distance'].values.reshape(-1, 1)
y_height = df['max_height'].values.reshape(-1, 1)
y_time = df['hang_time'].values.reshape(-1, 1)
y_location = df[['landing_x', 'landing_y']].values
# Split data
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_dist_train, y_dist_test = y_distance[:split_idx], y_distance[split_idx:]
y_height_train, y_height_test = y_height[:split_idx], y_height[split_idx:]
y_time_train, y_time_test = y_time[:split_idx], y_time[split_idx:]
y_loc_train, y_loc_test = y_location[:split_idx], y_location[split_idx:]
# Scale features
X_train = self.scaler_X.fit_transform(X_train)
X_test = self.scaler_X.transform(X_test)
# Build model
self.model = self.build_trajectory_model(input_dim=4)
# Train
history = self.model.fit(
X_train,
{
'distance': y_dist_train,
'max_height': y_height_train,
'hang_time': y_time_train,
'landing_location': y_loc_train
},
validation_data=(
X_test,
{
'distance': y_dist_test,
'max_height': y_height_test,
'hang_time': y_time_test,
'landing_location': y_loc_test
}
),
epochs=epochs,
batch_size=32,
verbose=1
)
print("\nEvaluating model...")
self.model.evaluate(
X_test,
{
'distance': y_dist_test,
'max_height': y_height_test,
'hang_time': y_time_test,
'landing_location': y_loc_test
},
verbose=0
)
return history
def predict_trajectory(self, exit_velocity, launch_angle, spray_angle, spin_rate):
"""Predict trajectory for given initial conditions"""
X = np.array([[exit_velocity, launch_angle, spray_angle, spin_rate]])
X_scaled = self.scaler_X.transform(X)
predictions = self.model.predict(X_scaled, verbose=0)
result = {
'distance': predictions[0][0][0],
'max_height': predictions[1][0][0],
'hang_time': predictions[2][0][0],
'landing_x': predictions[3][0][0],
'landing_y': predictions[3][0][1]
}
return result
# Train trajectory model
print("\n" + "="*60)
print("Training Trajectory Prediction Model")
print("="*60)
trajectory_model = TrajectoryPredictor()
traj_history = trajectory_model.train(epochs=40)
# Example prediction
print("\nExample Trajectory Prediction:")
print("Input: 105 mph exit velocity, 28° launch angle, 10° spray angle, 2200 rpm spin")
prediction = trajectory_model.predict_trajectory(105, 28, 10, 2200)
for key, value in prediction.items():
print(f"{key}: {value:.2f}")
End-to-End Pitch Classification Pipeline
A complete implementation of a pitch type classifier using neural networks, from data preparation to deployment.
class PitchTypeClassifier:
"""Complete pitch type classification system"""
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.pitch_types = [
'Fastball', 'Sinker', 'Cutter', 'Slider',
'Curveball', 'Changeup', 'Splitter', 'Knuckleball'
]
self.encoder = {pitch: i for i, pitch in enumerate(self.pitch_types)}
self.decoder = {i: pitch for i, pitch in enumerate(self.pitch_types)}
def generate_pitch_data(self, n_samples=10000):
"""Generate realistic pitch type data"""
np.random.seed(42)
data = []
# Define characteristics for each pitch type
pitch_profiles = {
'Fastball': {
'velocity': (92, 3),
'spin_rate': (2300, 200),
'h_break': (0, 6),
'v_break': (15, 4),
'extension': (6.2, 0.3)
},
'Sinker': {
'velocity': (92, 3),
'spin_rate': (2100, 200),
'h_break': (12, 4),
'v_break': (5, 3),
'extension': (6.1, 0.3)
},
'Cutter': {
'velocity': (89, 3),
'spin_rate': (2400, 200),
'h_break': (-4, 3),
'v_break': (10, 3),
'extension': (6.0, 0.3)
},
'Slider': {
'velocity': (85, 3),
'spin_rate': (2500, 250),
'h_break': (4, 4),
'v_break': (-2, 3),
'extension': (6.0, 0.3)
},
'Curveball': {
'velocity': (78, 3),
'spin_rate': (2700, 300),
'h_break': (8, 4),
'v_break': (-8, 3),
'extension': (5.9, 0.3)
},
'Changeup': {
'velocity': (84, 3),
'spin_rate': (1700, 200),
'h_break': (14, 4),
'v_break': (8, 3),
'extension': (6.1, 0.3)
},
'Splitter': {
'velocity': (86, 3),
'spin_rate': (1500, 200),
'h_break': (10, 4),
'v_break': (-4, 3),
'extension': (6.0, 0.3)
},
'Knuckleball': {
'velocity': (68, 4),
'spin_rate': (300, 100),
'h_break': (0, 10),
'v_break': (0, 8),
'extension': (5.8, 0.4)
}
}
samples_per_type = n_samples // len(self.pitch_types)
for pitch_type, profile in pitch_profiles.items():
for _ in range(samples_per_type):
velocity = np.random.normal(*profile['velocity'])
spin_rate = np.random.normal(*profile['spin_rate'])
h_break = np.random.normal(*profile['h_break'])
v_break = np.random.normal(*profile['v_break'])
extension = np.random.normal(*profile['extension'])
# Derived features
spin_axis = np.random.uniform(0, 360) # degrees
data.append({
'release_speed': velocity,
'release_spin_rate': spin_rate,
'pfx_x': h_break,
'pfx_z': v_break,
'release_extension': extension,
'release_pos_x': np.random.normal(0, 0.5),
'release_pos_z': np.random.normal(6, 0.3),
'spin_axis': spin_axis,
'pitch_type': pitch_type
})
return pd.DataFrame(data)
def create_advanced_features(self, df):
"""Engineer advanced features for better classification"""
# Velocity-adjusted spin
df['spin_per_velocity'] = df['release_spin_rate'] / df['release_speed']
# Total break
df['total_break'] = np.sqrt(df['pfx_x']**2 + df['pfx_z']**2)
# Break angle
df['break_angle'] = np.arctan2(df['pfx_z'], df['pfx_x']) * 180 / np.pi
# Velocity bands
df['velo_diff_from_90'] = df['release_speed'] - 90
# Interaction terms
df['spin_break_interaction'] = df['release_spin_rate'] * df['total_break']
return df
def build_classifier(self, input_dim):
"""Build deep neural network classifier"""
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(256, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.4),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.Dropout(0.2),
layers.Dense(len(self.pitch_types), activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
def train(self, epochs=50):
"""Train the pitch type classifier"""
print("Generating pitch data...")
df = self.generate_pitch_data(n_samples=15000)
print("Engineering features...")
df = self.create_advanced_features(df)
# Encode target
df['pitch_type_encoded'] = df['pitch_type'].map(self.encoder)
# Select features
feature_cols = [
'release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
'release_extension', 'release_pos_x', 'release_pos_z',
'spin_axis', 'spin_per_velocity', 'total_break',
'break_angle', 'velo_diff_from_90', 'spin_break_interaction'
]
X = df[feature_cols].values
y = df['pitch_type_encoded'].values
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale features
X_train = self.scaler.fit_transform(X_train)
X_test = self.scaler.transform(X_test)
# Build model
self.model = self.build_classifier(input_dim=len(feature_cols))
# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_accuracy',
patience=15,
restore_best_weights=True
)
reduce_lr = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=0.00001
)
# Train
print("\nTraining classifier...")
history = self.model.fit(
X_train, y_train,
validation_split=0.2,
epochs=epochs,
batch_size=32,
callbacks=[early_stopping, reduce_lr],
verbose=1
)
# Evaluate
test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest Accuracy: {test_acc:.4f}")
# Confusion matrix
from sklearn.metrics import classification_report, confusion_matrix
y_pred = np.argmax(self.model.predict(X_test, verbose=0), axis=1)
print("\nClassification Report:")
print(classification_report(y_test, y_pred,
target_names=self.pitch_types))
return history
def predict_pitch_type(self, pitch_features):
"""Predict pitch type with probabilities"""
# Engineer same features as training
features = {
'release_speed': pitch_features[0],
'release_spin_rate': pitch_features[1],
'pfx_x': pitch_features[2],
'pfx_z': pitch_features[3],
'release_extension': pitch_features[4],
'release_pos_x': pitch_features[5],
'release_pos_z': pitch_features[6],
'spin_axis': pitch_features[7]
}
# Derived features
features['spin_per_velocity'] = features['release_spin_rate'] / features['release_speed']
features['total_break'] = np.sqrt(features['pfx_x']**2 + features['pfx_z']**2)
features['break_angle'] = np.arctan2(features['pfx_z'], features['pfx_x']) * 180 / np.pi
features['velo_diff_from_90'] = features['release_speed'] - 90
features['spin_break_interaction'] = features['release_spin_rate'] * features['total_break']
# Create feature vector
X = np.array([[
features['release_speed'], features['release_spin_rate'],
features['pfx_x'], features['pfx_z'], features['release_extension'],
features['release_pos_x'], features['release_pos_z'],
features['spin_axis'], features['spin_per_velocity'],
features['total_break'], features['break_angle'],
features['velo_diff_from_90'], features['spin_break_interaction']
]])
X_scaled = self.scaler.transform(X)
probabilities = self.model.predict(X_scaled, verbose=0)[0]
predictions = {pitch: prob for pitch, prob in
zip(self.pitch_types, probabilities)}
predicted_type = max(predictions, key=predictions.get)
return predicted_type, predictions
# Train pitch type classifier
print("\n" + "="*60)
print("Training Pitch Type Classifier")
print("="*60)
pitch_classifier = PitchTypeClassifier()
pitch_history = pitch_classifier.train(epochs=40)
# Example predictions
print("\n" + "="*60)
print("Example Pitch Predictions")
print("="*60)
# Fastball
fastball_features = [94, 2350, 0, 16, 6.2, 0, 6.0, 180]
pitch_type, probs = pitch_classifier.predict_pitch_type(fastball_features)
print(f"\nFastball-like pitch (94 mph, 2350 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
print(f" {pitch}: {prob:.3f}")
# Curveball
curveball_features = [77, 2750, 8, -9, 5.9, 0, 6.0, 90]
pitch_type, probs = pitch_classifier.predict_pitch_type(curveball_features)
print(f"\nCurveball-like pitch (77 mph, 2750 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
print(f" {pitch}: {prob:.3f}")
class PitchTypeClassifier:
"""Complete pitch type classification system"""
def __init__(self):
self.model = None
self.scaler = StandardScaler()
self.pitch_types = [
'Fastball', 'Sinker', 'Cutter', 'Slider',
'Curveball', 'Changeup', 'Splitter', 'Knuckleball'
]
self.encoder = {pitch: i for i, pitch in enumerate(self.pitch_types)}
self.decoder = {i: pitch for i, pitch in enumerate(self.pitch_types)}
def generate_pitch_data(self, n_samples=10000):
"""Generate realistic pitch type data"""
np.random.seed(42)
data = []
# Define characteristics for each pitch type
pitch_profiles = {
'Fastball': {
'velocity': (92, 3),
'spin_rate': (2300, 200),
'h_break': (0, 6),
'v_break': (15, 4),
'extension': (6.2, 0.3)
},
'Sinker': {
'velocity': (92, 3),
'spin_rate': (2100, 200),
'h_break': (12, 4),
'v_break': (5, 3),
'extension': (6.1, 0.3)
},
'Cutter': {
'velocity': (89, 3),
'spin_rate': (2400, 200),
'h_break': (-4, 3),
'v_break': (10, 3),
'extension': (6.0, 0.3)
},
'Slider': {
'velocity': (85, 3),
'spin_rate': (2500, 250),
'h_break': (4, 4),
'v_break': (-2, 3),
'extension': (6.0, 0.3)
},
'Curveball': {
'velocity': (78, 3),
'spin_rate': (2700, 300),
'h_break': (8, 4),
'v_break': (-8, 3),
'extension': (5.9, 0.3)
},
'Changeup': {
'velocity': (84, 3),
'spin_rate': (1700, 200),
'h_break': (14, 4),
'v_break': (8, 3),
'extension': (6.1, 0.3)
},
'Splitter': {
'velocity': (86, 3),
'spin_rate': (1500, 200),
'h_break': (10, 4),
'v_break': (-4, 3),
'extension': (6.0, 0.3)
},
'Knuckleball': {
'velocity': (68, 4),
'spin_rate': (300, 100),
'h_break': (0, 10),
'v_break': (0, 8),
'extension': (5.8, 0.4)
}
}
samples_per_type = n_samples // len(self.pitch_types)
for pitch_type, profile in pitch_profiles.items():
for _ in range(samples_per_type):
velocity = np.random.normal(*profile['velocity'])
spin_rate = np.random.normal(*profile['spin_rate'])
h_break = np.random.normal(*profile['h_break'])
v_break = np.random.normal(*profile['v_break'])
extension = np.random.normal(*profile['extension'])
# Derived features
spin_axis = np.random.uniform(0, 360) # degrees
data.append({
'release_speed': velocity,
'release_spin_rate': spin_rate,
'pfx_x': h_break,
'pfx_z': v_break,
'release_extension': extension,
'release_pos_x': np.random.normal(0, 0.5),
'release_pos_z': np.random.normal(6, 0.3),
'spin_axis': spin_axis,
'pitch_type': pitch_type
})
return pd.DataFrame(data)
def create_advanced_features(self, df):
"""Engineer advanced features for better classification"""
# Velocity-adjusted spin
df['spin_per_velocity'] = df['release_spin_rate'] / df['release_speed']
# Total break
df['total_break'] = np.sqrt(df['pfx_x']**2 + df['pfx_z']**2)
# Break angle
df['break_angle'] = np.arctan2(df['pfx_z'], df['pfx_x']) * 180 / np.pi
# Velocity bands
df['velo_diff_from_90'] = df['release_speed'] - 90
# Interaction terms
df['spin_break_interaction'] = df['release_spin_rate'] * df['total_break']
return df
def build_classifier(self, input_dim):
"""Build deep neural network classifier"""
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(256, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.4),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(32, activation='relu'),
layers.Dropout(0.2),
layers.Dense(len(self.pitch_types), activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
def train(self, epochs=50):
"""Train the pitch type classifier"""
print("Generating pitch data...")
df = self.generate_pitch_data(n_samples=15000)
print("Engineering features...")
df = self.create_advanced_features(df)
# Encode target
df['pitch_type_encoded'] = df['pitch_type'].map(self.encoder)
# Select features
feature_cols = [
'release_speed', 'release_spin_rate', 'pfx_x', 'pfx_z',
'release_extension', 'release_pos_x', 'release_pos_z',
'spin_axis', 'spin_per_velocity', 'total_break',
'break_angle', 'velo_diff_from_90', 'spin_break_interaction'
]
X = df[feature_cols].values
y = df['pitch_type_encoded'].values
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale features
X_train = self.scaler.fit_transform(X_train)
X_test = self.scaler.transform(X_test)
# Build model
self.model = self.build_classifier(input_dim=len(feature_cols))
# Callbacks
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_accuracy',
patience=15,
restore_best_weights=True
)
reduce_lr = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=0.00001
)
# Train
print("\nTraining classifier...")
history = self.model.fit(
X_train, y_train,
validation_split=0.2,
epochs=epochs,
batch_size=32,
callbacks=[early_stopping, reduce_lr],
verbose=1
)
# Evaluate
test_loss, test_acc = self.model.evaluate(X_test, y_test, verbose=0)
print(f"\nTest Accuracy: {test_acc:.4f}")
# Confusion matrix
from sklearn.metrics import classification_report, confusion_matrix
y_pred = np.argmax(self.model.predict(X_test, verbose=0), axis=1)
print("\nClassification Report:")
print(classification_report(y_test, y_pred,
target_names=self.pitch_types))
return history
def predict_pitch_type(self, pitch_features):
"""Predict pitch type with probabilities"""
# Engineer same features as training
features = {
'release_speed': pitch_features[0],
'release_spin_rate': pitch_features[1],
'pfx_x': pitch_features[2],
'pfx_z': pitch_features[3],
'release_extension': pitch_features[4],
'release_pos_x': pitch_features[5],
'release_pos_z': pitch_features[6],
'spin_axis': pitch_features[7]
}
# Derived features
features['spin_per_velocity'] = features['release_spin_rate'] / features['release_speed']
features['total_break'] = np.sqrt(features['pfx_x']**2 + features['pfx_z']**2)
features['break_angle'] = np.arctan2(features['pfx_z'], features['pfx_x']) * 180 / np.pi
features['velo_diff_from_90'] = features['release_speed'] - 90
features['spin_break_interaction'] = features['release_spin_rate'] * features['total_break']
# Create feature vector
X = np.array([[
features['release_speed'], features['release_spin_rate'],
features['pfx_x'], features['pfx_z'], features['release_extension'],
features['release_pos_x'], features['release_pos_z'],
features['spin_axis'], features['spin_per_velocity'],
features['total_break'], features['break_angle'],
features['velo_diff_from_90'], features['spin_break_interaction']
]])
X_scaled = self.scaler.transform(X)
probabilities = self.model.predict(X_scaled, verbose=0)[0]
predictions = {pitch: prob for pitch, prob in
zip(self.pitch_types, probabilities)}
predicted_type = max(predictions, key=predictions.get)
return predicted_type, predictions
# Train pitch type classifier
print("\n" + "="*60)
print("Training Pitch Type Classifier")
print("="*60)
pitch_classifier = PitchTypeClassifier()
pitch_history = pitch_classifier.train(epochs=40)
# Example predictions
print("\n" + "="*60)
print("Example Pitch Predictions")
print("="*60)
# Fastball
fastball_features = [94, 2350, 0, 16, 6.2, 0, 6.0, 180]
pitch_type, probs = pitch_classifier.predict_pitch_type(fastball_features)
print(f"\nFastball-like pitch (94 mph, 2350 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
print(f" {pitch}: {prob:.3f}")
# Curveball
curveball_features = [77, 2750, 8, -9, 5.9, 0, 6.0, 90]
pitch_type, probs = pitch_classifier.predict_pitch_type(curveball_features)
print(f"\nCurveball-like pitch (77 mph, 2750 rpm):")
print(f"Predicted: {pitch_type}")
print(f"Top 3 probabilities:")
sorted_probs = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:3]
for pitch, prob in sorted_probs:
print(f" {pitch}: {prob:.3f}")
Complete Pipeline: From Data to Deployment
A production-ready deep learning pipeline for baseball analytics.
import joblib
from pathlib import Path
import json
class BaseballMLPipeline:
"""
Production-ready ML pipeline for baseball predictions
Includes data validation, preprocessing, training, and inference
"""
def __init__(self, model_dir='models'):
self.model_dir = Path(model_dir)
self.model_dir.mkdir(exist_ok=True)
self.model = None
self.preprocessor = None
self.config = {}
def validate_data(self, df, required_columns):
"""Validate input data"""
missing_cols = set(required_columns) - set(df.columns)
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Check for nulls
null_counts = df[required_columns].isnull().sum()
if null_counts.any():
print("Warning: Found null values:")
print(null_counts[null_counts > 0])
return True
def preprocess_data(self, df, fit=False):
"""Preprocess data with validation"""
if fit:
self.preprocessor = StandardScaler()
X_scaled = self.preprocessor.fit_transform(df)
else:
if self.preprocessor is None:
raise ValueError("Preprocessor not fitted. Call with fit=True first.")
X_scaled = self.preprocessor.transform(df)
return X_scaled
def build_model(self, input_dim, output_dim, task='classification'):
"""Build model based on task type"""
if task == 'classification':
activation = 'softmax'
loss = 'sparse_categorical_crossentropy'
metrics = ['accuracy']
else: # regression
activation = 'linear'
loss = 'mse'
metrics = ['mae']
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(256, activation='relu',
kernel_regularizer=keras.regularizers.l2(0.001)),
layers.BatchNormalization(),
layers.Dropout(0.4),
layers.Dense(128, activation='relu',
kernel_regularizer=keras.regularizers.l2(0.001)),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.Dropout(0.3),
layers.Dense(output_dim, activation=activation)
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss=loss,
metrics=metrics
)
return model
def train_with_cross_validation(self, X, y, n_splits=5):
"""Train with k-fold cross-validation"""
from sklearn.model_selection import KFold
kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(kfold.split(X)):
print(f"\nFold {fold + 1}/{n_splits}")
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model = self.build_model(
input_dim=X.shape[1],
output_dim=1,
task='regression'
)
history = model.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=30,
batch_size=32,
verbose=0
)
val_loss = min(history.history['val_loss'])
cv_scores.append(val_loss)
print(f"Validation Loss: {val_loss:.4f}")
print(f"\nCross-Validation Results:")
print(f"Mean Loss: {np.mean(cv_scores):.4f}")
print(f"Std Loss: {np.std(cv_scores):.4f}")
return cv_scores
def save_model(self, model_name):
"""Save model and preprocessor"""
model_path = self.model_dir / f"{model_name}.h5"
preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
config_path = self.model_dir / f"{model_name}_config.json"
self.model.save(model_path)
joblib.dump(self.preprocessor, preprocessor_path)
with open(config_path, 'w') as f:
json.dump(self.config, f, indent=2)
print(f"\nModel saved to {model_path}")
print(f"Preprocessor saved to {preprocessor_path}")
print(f"Config saved to {config_path}")
def load_model(self, model_name):
"""Load model and preprocessor"""
model_path = self.model_dir / f"{model_name}.h5"
preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
config_path = self.model_dir / f"{model_name}_config.json"
self.model = keras.models.load_model(model_path)
self.preprocessor = joblib.load(preprocessor_path)
with open(config_path, 'r') as f:
self.config = json.load(f)
print(f"Model loaded from {model_path}")
def predict_with_uncertainty(self, X, n_iterations=10):
"""
Predict with uncertainty estimation using Monte Carlo Dropout
"""
# Enable dropout at inference time
predictions = []
for _ in range(n_iterations):
# Predict with dropout active
pred = self.model(X, training=True)
predictions.append(pred.numpy())
predictions = np.array(predictions)
# Calculate mean and std
mean_prediction = predictions.mean(axis=0)
std_prediction = predictions.std(axis=0)
return mean_prediction, std_prediction
# Example usage
print("\n" + "="*60)
print("Baseball ML Pipeline Example")
print("="*60)
pipeline = BaseballMLPipeline(model_dir='baseball_models')
# Create sample data
sample_data = create_batter_features()
feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
'barrel_rate', 'hard_hit_rate', 'k_rate',
'bb_rate', 'pull_rate', 'groundball_rate']
X = sample_data[feature_cols].values
y = sample_data['wOBA'].values.reshape(-1, 1)
# Validate
pipeline.validate_data(sample_data, feature_cols)
# Preprocess
X_processed = pipeline.preprocess_data(X, fit=True)
# Cross-validation
cv_scores = pipeline.train_with_cross_validation(X_processed, y, n_splits=3)
Hyperparameter Tuning with Keras Tuner
try:
import keras_tuner as kt
def build_tunable_model(hp):
"""Build model with hyperparameter tuning"""
model = keras.Sequential()
model.add(layers.Input(shape=(9,)))
# Tune number of layers
for i in range(hp.Int('num_layers', 2, 5)):
model.add(layers.Dense(
units=hp.Int(f'units_{i}', min_value=32, max_value=256, step=32),
activation='relu'
))
model.add(layers.Dropout(
hp.Float(f'dropout_{i}', min_value=0.0, max_value=0.5, step=0.1)
))
model.add(layers.Dense(1))
# Tune learning rate
learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss='mse',
metrics=['mae']
)
return model
print("\nHyperparameter tuning example created")
print("To run tuning, use:")
print("tuner = kt.RandomSearch(build_tunable_model, ...)")
except ImportError:
print("\nKeras Tuner not installed. Install with: pip install keras-tuner")
Model Monitoring and Performance Tracking
class ModelMonitor:
"""Monitor model performance over time"""
def __init__(self):
self.performance_history = []
def log_prediction(self, y_true, y_pred, timestamp=None):
"""Log prediction for monitoring"""
if timestamp is None:
timestamp = pd.Timestamp.now()
error = abs(y_true - y_pred)
self.performance_history.append({
'timestamp': timestamp,
'y_true': y_true,
'y_pred': y_pred,
'error': error
})
def get_metrics(self, window='7D'):
"""Calculate metrics over time window"""
df = pd.DataFrame(self.performance_history)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
# Calculate rolling metrics
metrics = {
'mae': df['error'].resample(window).mean(),
'mse': (df['error'] ** 2).resample(window).mean(),
'count': df['error'].resample(window).count()
}
return metrics
def detect_drift(self, threshold=0.1):
"""Detect if model performance is degrading"""
if len(self.performance_history) < 100:
return False
recent_errors = [p['error'] for p in self.performance_history[-50:]]
baseline_errors = [p['error'] for p in self.performance_history[:50]]
recent_mae = np.mean(recent_errors)
baseline_mae = np.mean(baseline_errors)
drift = (recent_mae - baseline_mae) / baseline_mae
return drift > threshold
print("\nModel monitoring system created")
import joblib
from pathlib import Path
import json
class BaseballMLPipeline:
"""
Production-ready ML pipeline for baseball predictions
Includes data validation, preprocessing, training, and inference
"""
def __init__(self, model_dir='models'):
self.model_dir = Path(model_dir)
self.model_dir.mkdir(exist_ok=True)
self.model = None
self.preprocessor = None
self.config = {}
def validate_data(self, df, required_columns):
"""Validate input data"""
missing_cols = set(required_columns) - set(df.columns)
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Check for nulls
null_counts = df[required_columns].isnull().sum()
if null_counts.any():
print("Warning: Found null values:")
print(null_counts[null_counts > 0])
return True
def preprocess_data(self, df, fit=False):
"""Preprocess data with validation"""
if fit:
self.preprocessor = StandardScaler()
X_scaled = self.preprocessor.fit_transform(df)
else:
if self.preprocessor is None:
raise ValueError("Preprocessor not fitted. Call with fit=True first.")
X_scaled = self.preprocessor.transform(df)
return X_scaled
def build_model(self, input_dim, output_dim, task='classification'):
"""Build model based on task type"""
if task == 'classification':
activation = 'softmax'
loss = 'sparse_categorical_crossentropy'
metrics = ['accuracy']
else: # regression
activation = 'linear'
loss = 'mse'
metrics = ['mae']
model = keras.Sequential([
layers.Input(shape=(input_dim,)),
layers.Dense(256, activation='relu',
kernel_regularizer=keras.regularizers.l2(0.001)),
layers.BatchNormalization(),
layers.Dropout(0.4),
layers.Dense(128, activation='relu',
kernel_regularizer=keras.regularizers.l2(0.001)),
layers.BatchNormalization(),
layers.Dropout(0.3),
layers.Dense(64, activation='relu'),
layers.Dropout(0.3),
layers.Dense(output_dim, activation=activation)
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss=loss,
metrics=metrics
)
return model
def train_with_cross_validation(self, X, y, n_splits=5):
"""Train with k-fold cross-validation"""
from sklearn.model_selection import KFold
kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)
cv_scores = []
for fold, (train_idx, val_idx) in enumerate(kfold.split(X)):
print(f"\nFold {fold + 1}/{n_splits}")
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model = self.build_model(
input_dim=X.shape[1],
output_dim=1,
task='regression'
)
history = model.fit(
X_train, y_train,
validation_data=(X_val, y_val),
epochs=30,
batch_size=32,
verbose=0
)
val_loss = min(history.history['val_loss'])
cv_scores.append(val_loss)
print(f"Validation Loss: {val_loss:.4f}")
print(f"\nCross-Validation Results:")
print(f"Mean Loss: {np.mean(cv_scores):.4f}")
print(f"Std Loss: {np.std(cv_scores):.4f}")
return cv_scores
def save_model(self, model_name):
"""Save model and preprocessor"""
model_path = self.model_dir / f"{model_name}.h5"
preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
config_path = self.model_dir / f"{model_name}_config.json"
self.model.save(model_path)
joblib.dump(self.preprocessor, preprocessor_path)
with open(config_path, 'w') as f:
json.dump(self.config, f, indent=2)
print(f"\nModel saved to {model_path}")
print(f"Preprocessor saved to {preprocessor_path}")
print(f"Config saved to {config_path}")
def load_model(self, model_name):
"""Load model and preprocessor"""
model_path = self.model_dir / f"{model_name}.h5"
preprocessor_path = self.model_dir / f"{model_name}_preprocessor.pkl"
config_path = self.model_dir / f"{model_name}_config.json"
self.model = keras.models.load_model(model_path)
self.preprocessor = joblib.load(preprocessor_path)
with open(config_path, 'r') as f:
self.config = json.load(f)
print(f"Model loaded from {model_path}")
def predict_with_uncertainty(self, X, n_iterations=10):
"""
Predict with uncertainty estimation using Monte Carlo Dropout
"""
# Enable dropout at inference time
predictions = []
for _ in range(n_iterations):
# Predict with dropout active
pred = self.model(X, training=True)
predictions.append(pred.numpy())
predictions = np.array(predictions)
# Calculate mean and std
mean_prediction = predictions.mean(axis=0)
std_prediction = predictions.std(axis=0)
return mean_prediction, std_prediction
# Example usage
print("\n" + "="*60)
print("Baseball ML Pipeline Example")
print("="*60)
pipeline = BaseballMLPipeline(model_dir='baseball_models')
# Create sample data
sample_data = create_batter_features()
feature_cols = ['exit_velocity', 'launch_angle', 'sprint_speed',
'barrel_rate', 'hard_hit_rate', 'k_rate',
'bb_rate', 'pull_rate', 'groundball_rate']
X = sample_data[feature_cols].values
y = sample_data['wOBA'].values.reshape(-1, 1)
# Validate
pipeline.validate_data(sample_data, feature_cols)
# Preprocess
X_processed = pipeline.preprocess_data(X, fit=True)
# Cross-validation
cv_scores = pipeline.train_with_cross_validation(X_processed, y, n_splits=3)
try:
import keras_tuner as kt
def build_tunable_model(hp):
"""Build model with hyperparameter tuning"""
model = keras.Sequential()
model.add(layers.Input(shape=(9,)))
# Tune number of layers
for i in range(hp.Int('num_layers', 2, 5)):
model.add(layers.Dense(
units=hp.Int(f'units_{i}', min_value=32, max_value=256, step=32),
activation='relu'
))
model.add(layers.Dropout(
hp.Float(f'dropout_{i}', min_value=0.0, max_value=0.5, step=0.1)
))
model.add(layers.Dense(1))
# Tune learning rate
learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss='mse',
metrics=['mae']
)
return model
print("\nHyperparameter tuning example created")
print("To run tuning, use:")
print("tuner = kt.RandomSearch(build_tunable_model, ...)")
except ImportError:
print("\nKeras Tuner not installed. Install with: pip install keras-tuner")
class ModelMonitor:
"""Monitor model performance over time"""
def __init__(self):
self.performance_history = []
def log_prediction(self, y_true, y_pred, timestamp=None):
"""Log prediction for monitoring"""
if timestamp is None:
timestamp = pd.Timestamp.now()
error = abs(y_true - y_pred)
self.performance_history.append({
'timestamp': timestamp,
'y_true': y_true,
'y_pred': y_pred,
'error': error
})
def get_metrics(self, window='7D'):
"""Calculate metrics over time window"""
df = pd.DataFrame(self.performance_history)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.set_index('timestamp')
# Calculate rolling metrics
metrics = {
'mae': df['error'].resample(window).mean(),
'mse': (df['error'] ** 2).resample(window).mean(),
'count': df['error'].resample(window).count()
}
return metrics
def detect_drift(self, threshold=0.1):
"""Detect if model performance is degrading"""
if len(self.performance_history) < 100:
return False
recent_errors = [p['error'] for p in self.performance_history[-50:]]
baseline_errors = [p['error'] for p in self.performance_history[:50]]
recent_mae = np.mean(recent_errors)
baseline_mae = np.mean(baseline_errors)
drift = (recent_mae - baseline_mae) / baseline_mae
return drift > threshold
print("\nModel monitoring system created")
Easy Exercises
Exercise 28.1: Basic Neural Network (Easy)
Build a simple neural network to predict whether a pitch will be a strike or ball based on plate location (plate_x, plate_z).
# Your solution here
# Hint: Use a binary classification model with sigmoid activation
Exercise 28.2: Feature Engineering (Easy)
Create 5 new features from pitch tracking data that might improve pitch outcome prediction. Explain why each feature might be useful.
Exercise 28.3: Model Evaluation (Easy)
Given a trained model, calculate and interpret the following metrics:
- Accuracy
- Precision
- Recall
- F1-score
Medium Exercises
Exercise 28.4: Multi-Task Learning (Medium)
Build a neural network that simultaneously predicts:
- Whether a pitch will be a strike or ball
- Whether the batter will swing
Share weights between tasks and compare performance to two separate models.
# Your solution here
# Hint: Use multiple output heads with shared hidden layers
Exercise 28.5: Sequence Prediction (Medium)
Implement an LSTM that predicts the next pitch type in a sequence based on the previous 5 pitches and the game context (count, score, inning).
Exercise 28.6: Transfer Learning (Medium)
Use a pre-trained image model (ResNet, EfficientNet) to classify different batting stances from images. Fine-tune only the last few layers.
Hard Exercises
Exercise 28.7: Attention Mechanism (Hard)
Implement a Transformer-based model for pitch sequence prediction. The model should use attention to weigh the importance of different pitches in the sequence.
# Your solution here
# Hint: Use multi-head attention layers
class PitchTransformer(nn.Module):
def __init__(self, d_model=64, nhead=4, num_layers=2):
super().__init__()
# Your implementation here
pass
Exercise 28.8: GANs for Data Augmentation (Hard)
Build a Generative Adversarial Network (GAN) to generate synthetic pitch tracking data. Use this to augment a small dataset and show improved model performance.
Exercise 28.9: Reinforcement Learning for Strategy (Hard)
Implement a Deep Q-Network (DQN) that learns optimal pitch selection strategy. The agent should choose which pitch type to throw based on:
- Current count
- Batter statistics
- Previous pitches in the at-bat
Exercise 28.10: Explainable AI (Hard)
Implement SHAP (SHapley Additive exPlanations) or LIME (Local Interpretable Model-agnostic Explanations) to explain individual predictions from your pitch outcome model. Create visualizations showing which features were most important for specific predictions.
# Your solution here
# Hint: Use the shap library
import shap
# explainer = shap.DeepExplainer(model, background_data)
# shap_values = explainer.shap_values(X_test)
Project Exercise
Exercise 28.11: Complete Baseball Analytics System (Hard)
Build an end-to-end system that:
- Collects pitch-by-pitch data (you can use synthetic data)
- Trains multiple models:
- Pitch type classifier
- Pitch outcome predictor
- Batter performance forecaster
- Implements a REST API for predictions
- Creates a dashboard for monitoring model performance
- Includes automated retraining when performance degrades
Bonus challenges:
- Implement A/B testing for model versions
- Add model versioning and rollback capabilities
- Create confidence intervals for predictions
- Implement real-time inference with sub-100ms latency
Summary
In this chapter, we explored deep learning applications in baseball analytics:
- Neural Network Fundamentals: Learned how to apply feedforward networks to baseball statistics
- Pitch Outcome Prediction: Built multi-class classifiers for pitch results using deep networks
- LSTM for Time Series: Forecasted player performance using recurrent neural networks
- Computer Vision: Applied CNNs to swing analysis and pitch recognition
- Trajectory Modeling: Used physics-informed neural networks for ball flight prediction
- Pitch Classification: Created a complete pipeline for pitch type identification
- Production Pipelines: Implemented end-to-end systems with monitoring and deployment
Key Takeaways:
- Deep learning excels at finding complex patterns in high-dimensional baseball data
- LSTMs are powerful for sequential data like pitch sequences and player performance
- Transfer learning can accelerate computer vision applications
- Production systems require careful attention to data validation, monitoring, and retraining
- Ensemble methods and uncertainty quantification improve prediction reliability
Further Reading:
- "Deep Learning" by Goodfellow, Bengio, and Courville
- "Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow" by Aurélien Géron
- MLB Statcast data documentation
- PyTorch and TensorFlow official tutorials
- Papers on sports analytics from MIT Sloan Sports Analytics Conference
Tools and Libraries:
- TensorFlow/Keras: High-level deep learning framework
- PyTorch: Flexible deep learning framework for research
- Keras Tuner: Hyperparameter optimization
- SHAP: Model explainability
- MLflow: Experiment tracking and model management
- TensorBoard: Training visualization
Continue to Chapter 29 to explore ensemble methods and model stacking in baseball analytics.
# Your solution here
# Hint: Use a binary classification model with sigmoid activation
# Your solution here
# Hint: Use multiple output heads with shared hidden layers
# Your solution here
# Hint: Use multi-head attention layers
class PitchTransformer(nn.Module):
def __init__(self, d_model=64, nhead=4, num_layers=2):
super().__init__()
# Your implementation here
pass
# Your solution here
# Hint: Use the shap library
import shap
# explainer = shap.DeepExplainer(model, background_data)
# shap_values = explainer.shap_values(X_test)