This is the third blog in a series to develop a comprehensive suite of codes for robot arm software that could potentially become a realistic application in life or industry usage. The following features are added:
Learning capabilities: Implement a machine learning model that allows the robot to learn new tasks or improve its performance over time.
Voice synthesis: Add text-to-speech capabilities for vocal responses from the robot.
Gesture recognition: Implement the ability to understand and respond to hand gestures.
Main.py, robot_arm.py and robot_learner.py
# Import necessary libraries
import speech_recognition as sr
import spacy
import cv2 # python -m pip install opencv-python
import numpy as np
import pyttsx3 # python -m pip install pyttsx3
import mediapipe as mp
from robot_arm import RobotArm # Import the RobotArm module
# Initialize NLP model
nlp = spacy.load("en_core_web_sm")
# Initialize the robot arm with link lengths
robot_arm = RobotArm(link1_length=10, link2_length=10)
# Initialize text-to-speech engine
tts_engine = pyttsx3.init()
# Initialize object detection
net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")
classes = []
with open("coco.names", "r") as f:
classes = [line.strip() for line in f.readlines()]
layer_names = net.getLayerNames()
output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]
# Initialize MediaPipe hands
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=False, max_num_hands=1, min_detection_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils
class ContextManager:
def __init__(self):
self.previous_actions = []
self.current_object = None
self.current_position = None
self.detected_objects = {}
def update(self, action, objects=None, position=None):
self.previous_actions.append(action)
if objects:
self.current_object = objects[-1] # Last mentioned object
if position:
self.current_position = position
def get_context(self):
return {
"previous_actions": self.previous_actions[-3:], # Last 3 actions
"current_object": self.current_object,
"current_position": self.current_position,
"detected_objects": self.detected_objects
}
context_manager = ContextManager()
def speak(text):
"""Use text-to-speech to vocalize the given text."""
print(f"Robot says: {text}")
tts_engine.say(text)
tts_engine.runAndWait()
def listen_for_command():
# Initialize the recognizer
recognizer = sr.Recognizer()
# Use the microphone as the source for input
with sr.Microphone() as source:
print("Listening for command...")
audio = recognizer.listen(source)
# Convert audio to text
try:
command_text = recognizer.recognize_google(audio)
print(f"Command received: {command_text}")
return command_text
except sr.UnknownValueError:
print("Could not understand audio. Please try again.")
return None
except sr.RequestError as e:
print(f"Could not request results; {e}. Please check your network connection.")
return None
def detect_objects():
# Capture frame from camera
cap = cv2.VideoCapture(0)
ret, frame = cap.read()
cap.release()
height, width, channels = frame.shape
# Detecting objects
blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
net.setInput(blob)
outs = net.forward(output_layers)
# Information to return
class_ids = []
confidences = []
boxes = []
for out in outs:
for detection in out:
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
if confidence > 0.5:
# Object detected
center_x = int(detection[0] * width)
center_y = int(detection[1] * height)
w = int(detection[2] * width)
h = int(detection[3] * height)
# Rectangle coordinates
x = int(center_x - w / 2)
y = int(center_y - h / 2)
boxes.append([x, y, w, h])
confidences.append(float(confidence))
class_ids.append(class_id)
indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
detected_objects = {}
for i in range(len(boxes)):
if i in indexes:
label = str(classes[class_ids[i]])
detected_objects[label] = boxes[i]
context_manager.detected_objects = detected_objects
return detected_objects
def recognize_gesture(frame):
# Convert the BGR image to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Process the frame and detect hands
results = hands.process(rgb_frame)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
# Draw hand landmarks on the frame
mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
# Get landmark positions
landmarks = [(lm.x, lm.y) for lm in hand_landmarks.landmark]
# Recognize gestures based on landmark positions
if landmarks[8][1] < landmarks[6][1] and landmarks[12][1] > landmarks[10][1]:
return "point"
elif all(landmarks[i][1] < landmarks[i-2][1] for i in [8, 12, 16, 20]):
return "open_palm"
elif all(landmarks[i][1] > landmarks[i-2][1] for i in [8, 12, 16, 20]):
return "closed_fist"
return None
def process_gesture(gesture):
if gesture == "point":
speak("Detected pointing gesture. What would you like me to do?")
# Here you could implement logic to determine what the robot is pointing at
elif gesture == "open_palm":
speak("Detected open palm gesture. Stopping all current actions.")
# Implement stop action for the robot
elif gesture == "closed_fist":
speak("Detected closed fist gesture. Ready to receive new command.")
# Prepare the robot for a new command sequence
def process_command(command_text):
# Perform object detection
detected_objects = detect_objects()
speak(f"I have detected the following objects: {', '.join(detected_objects.keys())}")
# Set detected objects as obstacles for the robot arm
robot_arm.set_obstacles(list(detected_objects.values()))
# Parse the command using NLP
doc = nlp(command_text)
# Extract the verb (action) and objects from the command
action = None
objects = []
position = None
for token in doc:
if token.pos_ == "VERB":
action = token.lemma_
if token.pos_ in ["NOUN", "PROPN"]:
objects.append(token.text.lower())
if token.like_num:
if position is None:
position = [float(token.text)]
else:
position.append(float(token.text))
# Use context if no objects or position are specified
context = context_manager.get_context()
if not objects and context["current_object"]:
objects = [context["current_object"]]
if not position and context["current_position"]:
position = context["current_position"]
# Define a dictionary of supported actions and their corresponding robot functions
actions = {
"lift": robot_arm.lift,
"bring": robot_arm.bring,
"move": robot_arm.move,
"grab": robot_arm.grab,
"put": robot_arm.put,
"go": robot_arm.move_to_position
}
# Check if the action is supported and execute the corresponding function
if action in actions:
if action == "go" and position and len(position) == 2:
speak(f"Moving to position: ({position[0]}, {position[1]}) while avoiding obstacles")
actions[action](position[0], position[1])
context_manager.update(action, position=position)
# Save the learned model after each successful movement
robot_arm.save_learned_model("learned_model.npy")
speak("Movement completed and model updated.")
elif objects:
speak(f"Executing: {action} {', '.join(objects)}")
actions[action](*objects)
context_manager.update(action, objects=objects)
else:
speak(f"Action '{action}' recognized, but no objects or position specified.")
else:
speak(f"Action '{action}' not recognized. Supported actions are: {', '.join(actions.keys())}")
# Handle multi-step tasks based on context
if "lift" in context["previous_actions"] and action == "move":
speak("Detected multi-step task: lift and move")
# Implement specific behavior for lift-and-move sequence
def main():
# Load the learned model if it exists
try:
robot_arm.load_learned_model("learned_model.npy")
speak("Loaded previously learned model.")
except FileNotFoundError:
speak("No previous model found. Starting with a new model.")
speak("Robot is ready. Waiting for your command or gesture.")
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if not ret:
speak("Failed to capture video. Please check your camera.")
break
# Perform gesture recognition
gesture = recognize_gesture(frame)
if gesture:
process_gesture(gesture)
# Display the frame
cv2.imshow('Gesture Recognition', frame)
# Check for voice command (non-blocking)
command_text = listen_for_command()
if command_text:
process_command(command_text)
# Break the loop if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
import numpy as np
from scipy.linalg import expm, logm
from robot_learner import RobotLearner
class RobotArm:
def __init__(self, link1_length, link2_length):
self.link1_length = link1_length
self.link2_length = link2_length
self.obstacles = []
self.learner = RobotLearner(state_size=100, action_size=8) # Discretize the space into 10x10 grid, 8 possible movements
def skew(self, v):
"""Convert a 3D vector to a skew-symmetric matrix."""
return np.array([[0, -v[2], v[1]],
[v[2], 0, -v[0]],
[-v[1], v[0], 0]])
def forward_kinematics(self, theta1, theta2):
"""Compute the end-effector position given joint angles."""
T1 = self.joint_transform(self.link1_length, theta1)
T2 = self.joint_transform(self.link2_length, theta2)
T = np.dot(T1, T2)
return T[:2, 3] # Return x, y coordinates
def joint_transform(self, length, theta):
"""Compute the transformation matrix for a joint."""
c, s = np.cos(theta), np.sin(theta)
return np.array([[c, -s, 0, length*c],
[s, c, 0, length*s],
[0, 0, 1, 0],
[0, 0, 0, 1]])
def compute_jacobian(self, theta1, theta2):
"""Compute the Jacobian matrix."""
J = np.zeros((2, 2))
J[:, 0] = [-self.link1_length*np.sin(theta1) - self.link2_length*np.sin(theta1 + theta2),
self.link1_length*np.cos(theta1) + self.link2_length*np.cos(theta1 + theta2)]
J[:, 1] = [-self.link2_length*np.sin(theta1 + theta2),
self.link2_length*np.cos(theta1 + theta2)]
return J
def inverse_kinematics(self, target_x, target_y, max_iterations=100, tolerance=1e-3):
"""Compute inverse kinematics using Lie group methods."""
theta = np.array([0, 0]) # Initial guess
for _ in range(max_iterations):
current_pos = self.forward_kinematics(theta[0], theta[1])
error = np.array([target_x, target_y]) - current_pos
if np.linalg.norm(error) < tolerance:
return theta
J = self.compute_jacobian(theta[0], theta[1])
# Compute the twist
twist = np.linalg.pinv(J) @ error
# Convert twist to se(2) Lie algebra
se2_algebra = np.array([[0, -twist[1], twist[0]],
[twist[1], 0, 0],
[0, 0, 0]])
# Compute the incremental transformation
delta_T = expm(se2_algebra)
# Extract the rotation part
delta_R = delta_T[:2, :2]
# Convert rotation to angle
delta_theta = np.arctan2(delta_R[1, 0], delta_R[0, 0])
# Update joint angles
theta += np.array([delta_theta, delta_theta])
print("Inverse kinematics did not converge")
return None
def set_obstacles(self, obstacles):
"""Set the list of obstacles in the environment."""
self.obstacles = obstacles
def check_collision(self, x, y):
"""Check if the given position collides with any obstacle."""
for obstacle in self.obstacles:
ox, oy, w, h = obstacle
if ox < x < ox + w and oy < y < oy + h:
return True
return False
def find_path(self, start_x, start_y, goal_x, goal_y, max_steps=1000):
path = [(start_x, start_y)]
current_x, current_y = start_x, start_y
for _ in range(max_steps):
state = self.discretize_state(current_x, current_y)
action = self.learner.get_action(state)
next_x, next_y = self.get_next_position(current_x, current_y, action)
if self.check_collision(next_x, next_y):
reward = -10 # Penalize collision
else:
distance_to_goal = np.sqrt((next_x - goal_x)**2 + (next_y - goal_y)**2)
reward = -distance_to_goal # Reward is negative distance to goal
if distance_to_goal < 0.1:
path.append((goal_x, goal_y))
print("Goal reached!")
return path
path.append((next_x, next_y))
current_x, current_y = next_x, next_y
next_state = self.discretize_state(next_x, next_y)
self.learner.update(state, action, reward, next_state)
print("Max steps reached, goal not found.")
return None
def discretize_state(self, x, y):
# Convert continuous (x, y) to discrete state
x_discrete = int(x * 10 / (self.link1_length + self.link2_length))
y_discrete = int(y * 10 / (self.link1_length + self.link2_length))
return x_discrete * 10 + y_discrete
def get_next_position(self, current_x, current_y, action):
# Convert action to movement
angle = action * np.pi / 4 # 8 possible directions
step_size = 0.1
next_x = current_x + step_size * np.cos(angle)
next_y = current_y + step_size * np.sin(angle)
return next_x, next_y
def move_to_position(self, target_x, target_y):
current_pos = self.forward_kinematics(0, 0) # Assume starting position
path = self.find_path(current_pos[0], current_pos[1], target_x, target_y)
if path:
print(f"Path found to position ({target_x}, {target_y}). Following path:")
for x, y in path:
angles = self.inverse_kinematics(x, y)
if angles is not None:
print(f"Moving to intermediate position ({x:.2f}, {y:.2f}) with angles: {np.degrees(angles[0]):.2f}°, {np.degrees(angles[1]):.2f}°")
# Here you would add the code to physically move the robot arm
else:
print(f"Cannot move to intermediate position ({x:.2f}, {y:.2f})")
print("Reached target position")
else:
print("Cannot find a path to the specified position.")
def save_learned_model(self, filename):
self.learner.save_model(filename)
def load_learned_model(self, filename):
self.learner.load_model(filename)
def lift(self, *objects):
for obj in objects:
if self.is_within_reach(obj) and self.is_weight_supported(obj):
print(f"Lifting {obj}")
# Simulate the lifting process
self.perform_lift(obj)
print(f"{obj} has been lifted successfully.")
else:
print(f"Cannot lift {obj}. It may be out of reach or too heavy.")
def is_within_reach(self, obj):
# Placeholder logic to check if the object is within reach
# In a real scenario, this would involve sensor data or predefined coordinates
print(f"Checking if {obj} is within reach...")
return True # Assume all objects are within reach for this example
def is_weight_supported(self, obj):
# Placeholder logic to check if the object's weight is supported
# In a real scenario, this would involve weight sensors or predefined weight limits
print(f"Checking if {obj} is within weight limits...")
return True # Assume all objects are within weight limits for this example
def perform_lift(self, obj):
# Placeholder for the actual lifting mechanism
# This could involve sending commands to motors or actuators
print(f"Performing lift operation for {obj}...")
def bring(self, *objects):
print(f"Bringing {', '.join(objects)}")
def move(self, *objects):
print(f"Moving {', '.join(objects)}")
def grab(self, *objects):
print(f"Grabbing {', '.join(objects)}")
def put(self, *objects):
print(f"Putting {', '.join(objects)}")
# Example usage
if __name__ == "__main__":
arm = RobotArm(link1_length=10, link2_length=10)
arm.move_to_position(15, 5)
import numpy as np
import random
class RobotLearner:
def __init__(self, state_size, action_size, learning_rate=0.1, discount_factor=0.95, exploration_rate=1.0, exploration_decay=0.995):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.exploration_rate = exploration_rate
self.exploration_min = 0.01
self.exploration_decay = exploration_decay
self.q_table = np.zeros((state_size, action_size))
def get_action(self, state):
if random.uniform(0, 1) < self.exploration_rate:
return random.randint(0, self.action_size - 1)
return np.argmax(self.q_table[state])
def update(self, state, action, reward, next_state):
current_q = self.q_table[state, action]
max_next_q = np.max(self.q_table[next_state])
new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)
self.q_table[state, action] = new_q
self.exploration_rate = max(self.exploration_min, self.exploration_rate * self.exploration_decay)
def save_model(self, filename):
np.save(filename, self.q_table)
def load_model(self, filename):
self.q_table = np.load(filename)