Create a Simple Robot Arm Movement File 3

This is the third blog in a series to develop a comprehensive suite of codes for robot arm software that could potentially become a realistic application in life or industry usage. The following features are added:

Learning capabilities: Implement a machine learning model that allows the robot to learn new tasks or improve its performance over time.

Voice synthesis: Add text-to-speech capabilities for vocal responses from the robot.

Gesture recognition: Implement the ability to understand and respond to hand gestures.

Main.py, robot_arm.py and robot_learner.py

# Import necessary libraries
import speech_recognition as sr
import spacy
import cv2 # python -m pip install opencv-python
import numpy as np
import pyttsx3 # python -m pip install pyttsx3
import mediapipe as mp
from robot_arm import RobotArm  # Import the RobotArm module

# Initialize NLP model
nlp = spacy.load("en_core_web_sm")

# Initialize the robot arm with link lengths
robot_arm = RobotArm(link1_length=10, link2_length=10)

# Initialize text-to-speech engine
tts_engine = pyttsx3.init()

# Initialize object detection
net = cv2.dnn.readNet("yolov3.weights", "yolov3.cfg")
classes = []
with open("coco.names", "r") as f:
    classes = [line.strip() for line in f.readlines()]
layer_names = net.getLayerNames()
output_layers = [layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()]

# Initialize MediaPipe hands
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=False, max_num_hands=1, min_detection_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

class ContextManager:
    def __init__(self):
        self.previous_actions = []
        self.current_object = None
        self.current_position = None
        self.detected_objects = {}

    def update(self, action, objects=None, position=None):
        self.previous_actions.append(action)
        if objects:
            self.current_object = objects[-1]  # Last mentioned object
        if position:
            self.current_position = position

    def get_context(self):
        return {
            "previous_actions": self.previous_actions[-3:],  # Last 3 actions
            "current_object": self.current_object,
            "current_position": self.current_position,
            "detected_objects": self.detected_objects
        }

context_manager = ContextManager()

def speak(text):
    """Use text-to-speech to vocalize the given text."""
    print(f"Robot says: {text}")
    tts_engine.say(text)
    tts_engine.runAndWait()

def listen_for_command():
    # Initialize the recognizer
    recognizer = sr.Recognizer()

    # Use the microphone as the source for input
    with sr.Microphone() as source:
        print("Listening for command...")
        audio = recognizer.listen(source)

    # Convert audio to text
    try:
        command_text = recognizer.recognize_google(audio)
        print(f"Command received: {command_text}")
        return command_text
    except sr.UnknownValueError:
        print("Could not understand audio. Please try again.")
        return None
    except sr.RequestError as e:
        print(f"Could not request results; {e}. Please check your network connection.")
        return None

def detect_objects():
    # Capture frame from camera
    cap = cv2.VideoCapture(0)
    ret, frame = cap.read()
    cap.release()

    height, width, channels = frame.shape

    # Detecting objects
    blob = cv2.dnn.blobFromImage(frame, 0.00392, (416, 416), (0, 0, 0), True, crop=False)
    net.setInput(blob)
    outs = net.forward(output_layers)

    # Information to return
    class_ids = []
    confidences = []
    boxes = []

    for out in outs:
        for detection in out:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if confidence > 0.5:
                # Object detected
                center_x = int(detection[0] * width)
                center_y = int(detection[1] * height)
                w = int(detection[2] * width)
                h = int(detection[3] * height)

                # Rectangle coordinates
                x = int(center_x - w / 2)
                y = int(center_y - h / 2)

                boxes.append([x, y, w, h])
                confidences.append(float(confidence))
                class_ids.append(class_id)

    indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)

    detected_objects = {}
    for i in range(len(boxes)):
        if i in indexes:
            label = str(classes[class_ids[i]])
            detected_objects[label] = boxes[i]

    context_manager.detected_objects = detected_objects
    return detected_objects

def recognize_gesture(frame):
    # Convert the BGR image to RGB
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    # Process the frame and detect hands
    results = hands.process(rgb_frame)
    
    if results.multi_hand_landmarks:
        for hand_landmarks in results.multi_hand_landmarks:
            # Draw hand landmarks on the frame
            mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
            
            # Get landmark positions
            landmarks = [(lm.x, lm.y) for lm in hand_landmarks.landmark]
            
            # Recognize gestures based on landmark positions
            if landmarks[8][1] < landmarks[6][1] and landmarks[12][1] > landmarks[10][1]:
                return "point"
            elif all(landmarks[i][1] < landmarks[i-2][1] for i in [8, 12, 16, 20]):
                return "open_palm"
            elif all(landmarks[i][1] > landmarks[i-2][1] for i in [8, 12, 16, 20]):
                return "closed_fist"
    
    return None

def process_gesture(gesture):
    if gesture == "point":
        speak("Detected pointing gesture. What would you like me to do?")
        # Here you could implement logic to determine what the robot is pointing at
    elif gesture == "open_palm":
        speak("Detected open palm gesture. Stopping all current actions.")
        # Implement stop action for the robot
    elif gesture == "closed_fist":
        speak("Detected closed fist gesture. Ready to receive new command.")
        # Prepare the robot for a new command sequence

def process_command(command_text):
    # Perform object detection
    detected_objects = detect_objects()
    speak(f"I have detected the following objects: {', '.join(detected_objects.keys())}")

    # Set detected objects as obstacles for the robot arm
    robot_arm.set_obstacles(list(detected_objects.values()))

    # Parse the command using NLP
    doc = nlp(command_text)

    # Extract the verb (action) and objects from the command
    action = None
    objects = []
    position = None

    for token in doc:
        if token.pos_ == "VERB":
            action = token.lemma_
        if token.pos_ in ["NOUN", "PROPN"]:
            objects.append(token.text.lower())
        if token.like_num:
            if position is None:
                position = [float(token.text)]
            else:
                position.append(float(token.text))

    # Use context if no objects or position are specified
    context = context_manager.get_context()
    if not objects and context["current_object"]:
        objects = [context["current_object"]]
    if not position and context["current_position"]:
        position = context["current_position"]

    # Define a dictionary of supported actions and their corresponding robot functions
    actions = {
        "lift": robot_arm.lift,
        "bring": robot_arm.bring,
        "move": robot_arm.move,
        "grab": robot_arm.grab,
        "put": robot_arm.put,
        "go": robot_arm.move_to_position
    }

    # Check if the action is supported and execute the corresponding function
    if action in actions:
        if action == "go" and position and len(position) == 2:
            speak(f"Moving to position: ({position[0]}, {position[1]}) while avoiding obstacles")
            actions[action](position[0], position[1])
            context_manager.update(action, position=position)
            
            # Save the learned model after each successful movement
            robot_arm.save_learned_model("learned_model.npy")
            speak("Movement completed and model updated.")
        elif objects:
            speak(f"Executing: {action} {', '.join(objects)}")
            actions[action](*objects)
            context_manager.update(action, objects=objects)
        else:
            speak(f"Action '{action}' recognized, but no objects or position specified.")
    else:
        speak(f"Action '{action}' not recognized. Supported actions are: {', '.join(actions.keys())}")

    # Handle multi-step tasks based on context
    if "lift" in context["previous_actions"] and action == "move":
        speak("Detected multi-step task: lift and move")
        # Implement specific behavior for lift-and-move sequence

def main():
    # Load the learned model if it exists
    try:
        robot_arm.load_learned_model("learned_model.npy")
        speak("Loaded previously learned model.")
    except FileNotFoundError:
        speak("No previous model found. Starting with a new model.")

    speak("Robot is ready. Waiting for your command or gesture.")

    cap = cv2.VideoCapture(0)

    while True:
        ret, frame = cap.read()
        if not ret:
            speak("Failed to capture video. Please check your camera.")
            break

        # Perform gesture recognition
        gesture = recognize_gesture(frame)
        if gesture:
            process_gesture(gesture)

        # Display the frame
        cv2.imshow('Gesture Recognition', frame)

        # Check for voice command (non-blocking)
        command_text = listen_for_command()
        if command_text:
            process_command(command_text)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()
import numpy as np
from scipy.linalg import expm, logm
from robot_learner import RobotLearner

class RobotArm:
    def __init__(self, link1_length, link2_length):
        self.link1_length = link1_length
        self.link2_length = link2_length
        self.obstacles = []
        self.learner = RobotLearner(state_size=100, action_size=8)  # Discretize the space into 10x10 grid, 8 possible movements

    def skew(self, v):
        """Convert a 3D vector to a skew-symmetric matrix."""
        return np.array([[0, -v[2], v[1]],
                         [v[2], 0, -v[0]],
                         [-v[1], v[0], 0]])

    def forward_kinematics(self, theta1, theta2):
        """Compute the end-effector position given joint angles."""
        T1 = self.joint_transform(self.link1_length, theta1)
        T2 = self.joint_transform(self.link2_length, theta2)
        T = np.dot(T1, T2)
        return T[:2, 3]  # Return x, y coordinates

    def joint_transform(self, length, theta):
        """Compute the transformation matrix for a joint."""
        c, s = np.cos(theta), np.sin(theta)
        return np.array([[c, -s, 0, length*c],
                         [s, c, 0, length*s],
                         [0, 0, 1, 0],
                         [0, 0, 0, 1]])

    def compute_jacobian(self, theta1, theta2):
        """Compute the Jacobian matrix."""
        J = np.zeros((2, 2))
        J[:, 0] = [-self.link1_length*np.sin(theta1) - self.link2_length*np.sin(theta1 + theta2),
                   self.link1_length*np.cos(theta1) + self.link2_length*np.cos(theta1 + theta2)]
        J[:, 1] = [-self.link2_length*np.sin(theta1 + theta2),
                   self.link2_length*np.cos(theta1 + theta2)]
        return J

    def inverse_kinematics(self, target_x, target_y, max_iterations=100, tolerance=1e-3):
        """Compute inverse kinematics using Lie group methods."""
        theta = np.array([0, 0])  # Initial guess
        for _ in range(max_iterations):
            current_pos = self.forward_kinematics(theta[0], theta[1])
            error = np.array([target_x, target_y]) - current_pos
            
            if np.linalg.norm(error) < tolerance:
                return theta

            J = self.compute_jacobian(theta[0], theta[1])
            
            # Compute the twist
            twist = np.linalg.pinv(J) @ error
            
            # Convert twist to se(2) Lie algebra
            se2_algebra = np.array([[0, -twist[1], twist[0]],
                                    [twist[1], 0, 0],
                                    [0, 0, 0]])
            
            # Compute the incremental transformation
            delta_T = expm(se2_algebra)
            
            # Extract the rotation part
            delta_R = delta_T[:2, :2]
            
            # Convert rotation to angle
            delta_theta = np.arctan2(delta_R[1, 0], delta_R[0, 0])
            
            # Update joint angles
            theta += np.array([delta_theta, delta_theta])

        print("Inverse kinematics did not converge")
        return None

    def set_obstacles(self, obstacles):
        """Set the list of obstacles in the environment."""
        self.obstacles = obstacles

    def check_collision(self, x, y):
        """Check if the given position collides with any obstacle."""
        for obstacle in self.obstacles:
            ox, oy, w, h = obstacle
            if ox < x < ox + w and oy < y < oy + h:
                return True
        return False

    def find_path(self, start_x, start_y, goal_x, goal_y, max_steps=1000):
        path = [(start_x, start_y)]
        current_x, current_y = start_x, start_y

        for _ in range(max_steps):
            state = self.discretize_state(current_x, current_y)
            action = self.learner.get_action(state)
            next_x, next_y = self.get_next_position(current_x, current_y, action)

            if self.check_collision(next_x, next_y):
                reward = -10  # Penalize collision
            else:
                distance_to_goal = np.sqrt((next_x - goal_x)**2 + (next_y - goal_y)**2)
                reward = -distance_to_goal  # Reward is negative distance to goal

                if distance_to_goal < 0.1:
                    path.append((goal_x, goal_y))
                    print("Goal reached!")
                    return path

                path.append((next_x, next_y))
                current_x, current_y = next_x, next_y

            next_state = self.discretize_state(next_x, next_y)
            self.learner.update(state, action, reward, next_state)

        print("Max steps reached, goal not found.")
        return None

    def discretize_state(self, x, y):
        # Convert continuous (x, y) to discrete state
        x_discrete = int(x * 10 / (self.link1_length + self.link2_length))
        y_discrete = int(y * 10 / (self.link1_length + self.link2_length))
        return x_discrete * 10 + y_discrete

    def get_next_position(self, current_x, current_y, action):
        # Convert action to movement
        angle = action * np.pi / 4  # 8 possible directions
        step_size = 0.1
        next_x = current_x + step_size * np.cos(angle)
        next_y = current_y + step_size * np.sin(angle)
        return next_x, next_y

    def move_to_position(self, target_x, target_y):
        current_pos = self.forward_kinematics(0, 0)  # Assume starting position
        path = self.find_path(current_pos[0], current_pos[1], target_x, target_y)

        if path:
            print(f"Path found to position ({target_x}, {target_y}). Following path:")
            for x, y in path:
                angles = self.inverse_kinematics(x, y)
                if angles is not None:
                    print(f"Moving to intermediate position ({x:.2f}, {y:.2f}) with angles: {np.degrees(angles[0]):.2f}°, {np.degrees(angles[1]):.2f}°")
                    # Here you would add the code to physically move the robot arm
                else:
                    print(f"Cannot move to intermediate position ({x:.2f}, {y:.2f})")
            print("Reached target position")
        else:
            print("Cannot find a path to the specified position.")

    def save_learned_model(self, filename):
        self.learner.save_model(filename)

    def load_learned_model(self, filename):
        self.learner.load_model(filename)

    def lift(self, *objects):
        for obj in objects:
            if self.is_within_reach(obj) and self.is_weight_supported(obj):
                print(f"Lifting {obj}")
                # Simulate the lifting process
                self.perform_lift(obj)
                print(f"{obj} has been lifted successfully.")
            else:
                print(f"Cannot lift {obj}. It may be out of reach or too heavy.")

    def is_within_reach(self, obj):
        # Placeholder logic to check if the object is within reach
        # In a real scenario, this would involve sensor data or predefined coordinates
        print(f"Checking if {obj} is within reach...")
        return True  # Assume all objects are within reach for this example

    def is_weight_supported(self, obj):
        # Placeholder logic to check if the object's weight is supported
        # In a real scenario, this would involve weight sensors or predefined weight limits
        print(f"Checking if {obj} is within weight limits...")
        return True  # Assume all objects are within weight limits for this example

    def perform_lift(self, obj):
        # Placeholder for the actual lifting mechanism
        # This could involve sending commands to motors or actuators
        print(f"Performing lift operation for {obj}...")

    def bring(self, *objects):
        print(f"Bringing {', '.join(objects)}")

    def move(self, *objects):
        print(f"Moving {', '.join(objects)}")

    def grab(self, *objects):
        print(f"Grabbing {', '.join(objects)}")

    def put(self, *objects):
        print(f"Putting {', '.join(objects)}")

# Example usage
if __name__ == "__main__":
    arm = RobotArm(link1_length=10, link2_length=10)
    arm.move_to_position(15, 5)

import numpy as np
import random

class RobotLearner:
    def __init__(self, state_size, action_size, learning_rate=0.1, discount_factor=0.95, exploration_rate=1.0, exploration_decay=0.995):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.exploration_min = 0.01
        self.exploration_decay = exploration_decay
        self.q_table = np.zeros((state_size, action_size))

    def get_action(self, state):
        if random.uniform(0, 1) < self.exploration_rate:
            return random.randint(0, self.action_size - 1)
        return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state):
        current_q = self.q_table[state, action]
        max_next_q = np.max(self.q_table[next_state])
        new_q = current_q + self.learning_rate * (reward + self.discount_factor * max_next_q - current_q)
        self.q_table[state, action] = new_q

        self.exploration_rate = max(self.exploration_min, self.exploration_rate * self.exploration_decay)

    def save_model(self, filename):
        np.save(filename, self.q_table)

    def load_model(self, filename):
        self.q_table = np.load(filename)

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.