-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
370 lines (319 loc) · 12.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
import time, subprocess, wave, os, re, sounddevice, warnings, threading
import speech_recognition as sr
from openai import OpenAI
import multiprocessing
import threading
from gtts import gTTS
import pygame
import os
from dotenv import load_dotenv
import traceback
from Modules import music_player, diary, hygrometer, alarm_clock, light_control, web_console
web_console.start_web_console()
#multiprocessing.set_start_method("spawn", force=True)
#Configuration
WAKE_WORD = "assistant"
USER_NAME = "Luke"
#Load the environment variables from the .env file
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
# Debugging: Print the API key to verify
if api_key:
print("API Key loaded successfully!")
else:
print("Error: API Key not loaded!")
#Setting API-Key
client = OpenAI(
api_key=api_key
)
#initializing thread dictionary
process_dict = {}
stoppable_processes = {
"play_music",
"diary_read_entry",
"diary_create_entry",
"hygrometer_read_humidity",
"hygrometer_read_temperature",
"ask_openai",
"alarm_clock",
}
stop_flag = multiprocessing.Value("b", False)
# Redirect stderr to suppress ALSA and JACK errors
#sys.stderr = open(os.devnull, 'w')
#warnings.filterwarnings("ignore") # Suppress other warnings
# Suppress ALSA and PulseAudio warnings
os.environ["SDL_AUDIODRIVER"] = "pulse" # Use ALSA driver explicitly
os.environ["PYGAME_HIDE_SUPPORT_PROMPT"] = "hide" # Suppress pygame startup message
os.environ["ALSA_LOG_LEVEL"] = "none" # Suppress ALSA messages
#Optimize the speech_recognition Library Settings
#sr.energy_threshold = 50 # Default is around 300; increase for noisy environments
sr.dynamic_energy_threshold = True # Automatically adjust during runtime
sr.pause_threshold = 1.0 # Default is 0.8; increase to 1.0 or higher
# Initialize pygame mixer
pygame.mixer.pre_init(44100, -16, 2, 512) # Standard audio settings
pygame.init()
# Function to play TTS output
def speak(text):
print(f"Assistant: {text}")
tts = gTTS(text=text, lang="en")
audio_file = "output.mp3"
tts.save(audio_file)
#Play the audio using pygame
pygame.mixer.music.load(audio_file)
pygame.mixer.music.play()
# Wait for playback to finish, but check for stop flag periodically
while pygame.mixer.music.get_busy():
# Check if the global stop flag is set
if stop_flag.value:
print("Stop flag detected. Stopping playback.")
pygame.mixer.music.stop()
stop_flag.value = False # Reset for future playback
break
time.sleep(0.1)
#Clean up the temporary file
if os.path.exists(audio_file):
os.remove(audio_file)
def speak_async(text):
threading.Thread(target=speak, args=(text,), daemon=True).start()
# Function to listen to user input
def listen():
recognizer = sr.Recognizer()
try:
mic = sr.Microphone()
with mic as source:
print("Listening...")
audio = recognizer.listen(source)
command = recognizer.recognize_google(audio)
print(f"You said: {command}")
return command.lower()
except sr.UnknownValueError:
print("No Input Detected")
return ""
except sr.RequestError:
speak("Could not connect to the speech service")
return ""
def detect_command(input_text, keyword):
if keyword in input_text:
return True, input_text[input_text.find(keyword)+len(keyword):]
return False, None
# Function to handle OpenAI GPT query
def ask_openai(prompt):
print("Asking OpenAI...")
try:
completion = client.chat.completions.create(
model="gpt-4o", # Use "gpt-4" or "gpt-3.5-turbo"
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
)
#print(f"FULL API Response: {completion}")
response = completion.choices[0].message.content.strip()
#print response to console
print("Response from OpenAI:", response)
return response
except Exception:
traceback.print_exc()
return "Error when asking OpenAI."
# This function used to mitigate having to call speak in the subprocess
def ask_openai_worker(prompt, output_queue):
response = ask_openai(prompt)
output_queue.put(response)
def stop_processes(process_names):
stop_flag.value = True
for process_name in process_names:
existing_process = process_dict.get(process_name)
if existing_process:
if isinstance(existing_process, multiprocessing.Process):
if existing_process.is_alive():
print(f"Stopping process: {process_name}")
existing_process.terminate()
existing_process.join()
elif isinstance(existing_process, threading.Thread):
if existing_process.is_alive():
print(f"Stopping thread: {process_name}")
existing_process.join(timeout=1)
else:
print(f"Unknown process type for {process_name}, cannot stop.")
stop_flag.value = False # Reset for future use
def handle_voice_command(user_input):
# Handle voice commands by routing it to the appropriate module.
play_music_bool, song_name = detect_command(user_input, "play music")
if play_music_bool:
print("Music command detected. Opening music_player module...")
if song_name:
stop_processes(stoppable_processes)
new_process = multiprocessing.Process(
target=music_player.play_song,
args=(song_name, stop_flag)
)
process_dict['play_music'] = new_process
new_process.start()
else:
speak("Please specify the song you'd like to play.")
return
diary_entry_bool, diary_command = detect_command(user_input, "create diary")
if diary_entry_bool:
print("Diary entry command detected. Starting a new diary entry...")
stop_processes(stoppable_processes)
# Start a new thread for diary entry
diary_thread = threading.Thread(
target=diary.create_entry,
args=(speak, listen, stop_flag), # Pass `speak` and `stop_flag`
daemon=True
)
process_dict['diary_create_entry'] = diary_thread
diary_thread.start()
return
read_diary_bool, read_diary_command = detect_command(user_input, "read diary")
if read_diary_bool:
print("Read diary entry command detected. Fetching the diary entry...")
stop_processes(stoppable_processes)
# Start a new thread for reading diary
diary_thread = threading.Thread(
target=diary.read_entry,
args=(speak, listen, stop_flag), # Pass `speak` and `stop_flag`
daemon=True
)
process_dict['diary_read_entry'] = diary_thread
diary_thread.start()
return
read_temperature_bool, read_temperature_command = detect_command(user_input, "temperature")
if read_temperature_bool:
print("Read temperature command detected. Fetching temperature...")
stop_processes(stoppable_processes)
new_thread = threading.Thread(
target=hygrometer.read_temperature,
args=(speak,)
)
process_dict['hygrometer_read_temperature'] = new_thread
new_thread.start()
return
read_temperature_bool, read_temperature_command = detect_command(user_input, "temperature")
if read_temperature_bool:
print("Read temperature command detected. Fetching temperature...")
stop_processes(stoppable_processes)
new_thread = threading.Thread(
target=hygrometer.read_temperature,
args=(speak,)
)
process_dict['hygrometer_read_temperature'] = new_thread
new_thread.start()
return
read_humidity_bool, read_humidity_command = detect_command(user_input, "humidity")
if read_humidity_bool:
print("Read humidity command detected. Fetching the humidity...")
stop_processes(stoppable_processes)
new_thread = threading.Thread(
target=hygrometer.read_humidity,
args=(speak,)
)
process_dict['hygrometer_read_humidity'] = new_thread
new_thread.start()
return
set_alarm_bool, _ = detect_command(user_input, "set alarm")
if set_alarm_bool:
print("Setting Up Alarm...")
stop_processes(stoppable_processes)
# Instead of spawning a new process, start the setup_alarm function in a thread.
alarm_thread = threading.Thread(
target=alarm_clock.setup_alarm,
args=(speak, listen, stop_flag),
daemon=True
)
process_dict['set_alarm'] = alarm_thread
alarm_thread.start()
return
delete_alarm_bool, delete_alarm_command = detect_command(user_input, "delete alarm")
if delete_alarm_bool:
print("Deleting Alarm...")
stop_processes(stoppable_processes)
alarm_name = delete_alarm_command.strip()
new_process = multiprocessing.Process(
target=alarm_clock.delete_alarm,
args=(alarm_name, speak)
)
process_dict['delete_alarm'] = new_process
new_process.start()
return
set_light_bool, light_command = detect_command(user_input, "set light")
if set_light_bool:
print("Light control command detected.")
stop_processes(stoppable_processes)
# Process additional keywords; convert to lowercase and remove the word "brightness"
params = light_command.lower().replace("brightness", "")
# If the user explicitly says "on" or "off"
if "on" in params:
light_control.turn_on()
elif "off" in params:
light_control.turn_off()
else:
# Expanded mapping for common colors
color_map = {
"red": {"h": 0, "s": 1000},
"green": {"h": 120, "s": 1000},
"blue": {"h": 240, "s": 1000},
"yellow": {"h": 60, "s": 1000},
"purple": {"h": 270, "s": 1000},
"pink": {"h": 330, "s": 1000},
"white": {"h": 0, "s": 0}, # White: saturation 0
"orange": {"h": 30, "s": 1000},
"cyan": {"h": 180, "s": 1000},
"magenta": {"h": 300, "s": 1000}
}
selected_color = None
for color in color_map:
if color in params:
selected_color = color
break
# Look for a brightness percentage, e.g. "50 percent" or "50%"
brightness_match = re.search(r'(\d+)\s*(?:percent|%)', params)
brightness_value = None
if brightness_match:
percent = int(brightness_match.group(1))
brightness_value = int(percent / 100 * 1000)
if selected_color:
# If a color is specified, use that color mapping.
hue = color_map[selected_color]["h"]
saturation = color_map[selected_color]["s"]
if brightness_value is None:
brightness_value = 1000 # Default full brightness
light_control.set_color(hue=hue, saturation=saturation, brightness=brightness_value)
elif brightness_value is not None:
# If only brightness is specified, adjust brightness while preserving the current color.
light_control.adjust_brightness(brightness=brightness_value)
else:
speak(
"I didn't understand the light command. Please say a color (red, blue, etc.) or specify a brightness percentage.")
return
stop_bool, stop_command = detect_command(user_input, "stop")
if stop_bool:
print("Stopping process...")
stop_processes(stoppable_processes)
return
# Default: Send query to OpenAI
stop_processes(stoppable_processes)
result_queue = multiprocessing.Queue()
new_process = multiprocessing.Process(
target=ask_openai_worker,
args=(user_input, result_queue),
)
process_dict['ask_openai'] = new_process
new_process.start()
response = result_queue.get()
new_process.join()
speak_async(response)
# Main function
def voice_assistant():
#speak(f"Hello {USER_NAME}. I am your voice assistant. Say {WAKE_WORD} to wake me up.")
while True:
audio_input = listen()
detected, command = detect_command(audio_input, WAKE_WORD)
if detected:
if not command:
print("Wake word detected")
speak("How can I help you?")
command = listen()
handle_voice_command(command)
if __name__ == '__main__':
voice_assistant()