Here’s a more complete GPU-accelerated signal generation pipeline for quantitative finance using Python with CuPy and Numba.cuda. It includes:
- Data simulation (price series)
- GPU-based moving averages
- A nonlinear filter kernel (Numba.cuda)
- Signal generation logic (crossover + filter)
- Batch processing multiple assets in parallel
import cupy as cp
import numpy as np
from numba import cuda
# --- Step 1: GPU Moving Average with CuPy ---
def moving_average_gpu(signal, window):
kernel = cp.ones(window) / window
return cp.convolve(signal, kernel, mode='valid')
# --- Step 2: Numba.cuda Kernel for nonlinear filtering ---
@cuda.jit
def nonlinear_filter_kernel(data, output, threshold):
idx = cuda.grid(1)
if idx < data.size:
val = data[idx]
if val > threshold:
output[idx] = threshold
elif val < -threshold:
output[idx] = -threshold
else:
output[idx] = val
# --- Step 3: Generate signal using moving average crossover + nonlinear filter ---
def generate_signal(price_series, short_window=10, long_window=30, threshold=0.5):
# Compute MAs on GPU
short_ma = moving_average_gpu(price_series, short_window)
long_ma = moving_average_gpu(price_series, long_window)
# Align length
min_len = min(len(short_ma), len(long_ma))
short_ma = short_ma[-min_len:]
long_ma = long_ma[-min_len:]
# Raw signal: +1 where short_ma > long_ma, else -1
raw_signal = cp.where(short_ma > long_ma, 1.0, -1.0)
# Allocate output array for filtered signal (Numba works on CPU arrays)
raw_signal_cpu = cp.asnumpy(raw_signal)
filtered_signal = np.empty_like(raw_signal_cpu)
# Launch CUDA kernel for nonlinear filtering
threads_per_block = 256
blocks_per_grid = (raw_signal_cpu.size + threads_per_block - 1) // threads_per_block
nonlinear_filter_kernel[blocks_per_grid, threads_per_block](raw_signal_cpu, filtered_signal, threshold)
# Convert filtered signal back to GPU if needed
return cp.asarray(filtered_signal)
# --- Step 4: Batch process multiple price series ---
def batch_generate_signals(price_batch, short_window=10, long_window=30, threshold=0.5):
# price_batch: 2D CuPy array shape (num_assets, num_points)
signals = []
for i in range(price_batch.shape[0]):
sig = generate_signal(price_batch[i], short_window, long_window, threshold)
signals.append(sig)
return cp.stack(signals)
# --- Example Usage ---
# Simulate batch of 5 assets, each with 100k price points
num_assets = 5
num_points = 100_000
# Generate random walk prices on GPU
price_batch_gpu = cp.cumsum(cp.random.randn(num_assets, num_points).astype(cp.float32), axis=1) + 100
# Generate signals for all assets
signals_gpu = batch_generate_signals(price_batch_gpu)
# Transfer one example signal to CPU for inspection
example_signal = signals_gpu[0].get()
print("Sample signal values:", example_signal[:20])