-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathArctis_7_Plus_ChatMix.py
231 lines (190 loc) · 9.31 KB
/
Arctis_7_Plus_ChatMix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
""" Copyright (C) 2022 birdybirdonline & awth13 - see LICENSE.md
@ https://github.com/birdybirdonline/Linux-Arctis-7-Plus-ChatMix
Contact via Github in the first instance
https://github.com/birdybirdonline
https://github.com/awth13
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import os
import sys
import signal
import logging
import traceback
import re
import usb.core
class Arctis7PlusChatMix:
def __init__(self):
# set to receive signal from systemd for termination
signal.signal(signal.SIGTERM, self.__handle_sigterm)
self.log = self._init_log()
self.log.info("Initializing ac7pcm...")
# identify the arctis 7+ device
try:
self.dev=usb.core.find(idVendor=0x1038, idProduct=0x220e)
except Exception as e:
self.log.error("""Failed to identify the Arctis 7+ device.
Please ensure it is connected.\n
Please note: This program only supports the '7+' model.""")
self.die_gracefully(trigger ="Couldn't find arctis7 model")
# select its interface and USB endpoint, and capture the endpoint address
try:
# interface index 7 of the Arctis 7+ is the USB HID for the ChatMix dial;
# its actual interface number on the device itself is 5.
self.interface = self.dev[0].interfaces()[7]
self.interface_num = self.interface.bInterfaceNumber
self.endpoint = self.interface.endpoints()[0]
self.addr = self.endpoint.bEndpointAddress
except Exception as e:
self.log.error("""Failure to identify relevant
USB device's interface or endpoint. Shutting down...""")
self.die_gracefully(exc=True, trigger ="identification of USB endpoint")
# detach if the device is active
if self.dev.is_kernel_driver_active(self.interface_num):
self.dev.detach_kernel_driver(self.interface_num)
self.VAC = self._init_VAC()
def _init_log(self):
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
stdout_handler = logging.StreamHandler()
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(logging.Formatter('%(levelname)8s | %(message)s'))
log.addHandler(stdout_handler)
return (log)
def _init_VAC(self):
"""Get name of default sink, establish virtual sink
and pipe its output to the default sink
"""
# get the default sink id from pactl
self.system_default_sink = os.popen("pactl get-default-sink").read().strip()
self.log.info(f"default sink identified as {self.system_default_sink}")
# attempt to identify an Arctis sink via pactl
try:
pactl_short_sinks = os.popen("pactl list short sinks").readlines()
# grab any elements from list of pactl sinks that are Arctis 7
arctis = re.compile('.*[aA]rctis.*7')
arctis_sink = list(filter(arctis.match, pactl_short_sinks))[0]
# split the arctis line on tabs (which form table given by 'pactl short sinks')
tabs_pattern = re.compile(r'\t')
tabs_re = re.split(tabs_pattern, arctis_sink)
# skip first element of tabs_re (sink's ID which is not persistent)
arctis_device = tabs_re[1]
self.log.info(f"Arctis sink identified as {arctis_device}")
default_sink = arctis_device
except Exception as e:
self.log.error("""Something wrong with Arctis definition
in pactl list short sinks regex matching.
Likely no match found for device, check traceback.
""", exc_info=True)
self.die_gracefully(trigger="No Arctis device match")
# Destroy virtual sinks if they already existed incase of previous failure:
try:
destroy_a7p_game = os.system("pw-cli destroy Arctis_Game 2>/dev/null")
destroy_a7p_chat = os.system("pw-cli destroy Arctis_Chat 2>/dev/null")
if destroy_a7p_game == 0 or destroy_a7p_chat == 0:
raise Exception
except Exception as e:
self.log.info("""Attempted to destroy old VAC sinks at init but none existed""")
# Instantiate our virtual sinks - Arctis_Chat and Arctis_Game
try:
self.log.info("Creating VACS...")
os.system("""pw-cli create-node adapter '{
factory.name=support.null-audio-sink
node.name=Arctis_Game
node.description="Arctis 7+ Game"
media.class=Audio/Sink
monitor.channel-volumes=true
object.linger=true
audio.position=[FL FR]
}' 1>/dev/null
""")
os.system("""pw-cli create-node adapter '{
factory.name=support.null-audio-sink
node.name=Arctis_Chat
node.description="Arctis 7+ Chat"
media.class=Audio/Sink
monitor.channel-volumes=true
object.linger=true
audio.position=[FL FR]
}' 1>/dev/null
""")
except Exception as E:
self.log.error("""Failure to create node adapter -
Arctis_Chat virtual device could not be created""", exc_info=True)
self.die_gracefully(sink_creation_fail=True, trigger="VAC node adapter")
#route the virtual sink's L&R channels to the default system output's LR
try:
self.log.info("Assigning VAC sink monitors output to default device...")
os.system(f'pw-link "Arctis_Game:monitor_FL" '
f'"{default_sink}:playback_FL" 1>/dev/null')
os.system(f'pw-link "Arctis_Game:monitor_FR" '
f'"{default_sink}:playback_FR" 1>/dev/null')
os.system(f'pw-link "Arctis_Chat:monitor_FL" '
f'"{default_sink}:playback_FL" 1>/dev/null')
os.system(f'pw-link "Arctis_Chat:monitor_FR" '
f'"{default_sink}:playback_FR" 1>/dev/null')
except Exception as e:
self.log.error("""Couldn't create the links to
pipe LR from VAC to default device""", exc_info=True)
self.die_gracefully(sink_fail=True, trigger="LR links")
# set the default sink to Arctis Game
os.system('pactl set-default-sink Arctis_Game')
def start_modulator_signal(self):
"""Listen to the USB device for modulator knob's signal
and adjust volume accordingly
"""
self.log.info("Reading modulator USB input started")
self.log.info("-"*45)
self.log.info("Arctis 7+ ChatMix Enabled!")
self.log.info("-"*45)
while True:
try:
# read the input of the USB signal. Signal is sent in 64-bit interrupt packets.
# read_input[1] returns value to use for default device volume
# read_input[2] returns the value to use for virtual device volume
read_input = self.dev.read(self.addr, 64)
default_device_volume = "{}%".format(read_input[1])
virtual_device_volume = "{}%".format(read_input[2])
# os.system calls to issue the commands directly to pactl
os.system(f'pactl set-sink-volume Arctis_Game {default_device_volume}')
os.system(f'pactl set-sink-volume Arctis_Chat {virtual_device_volume}')
except usb.core.USBTimeoutError:
pass
except usb.core.USBError:
self.log.fatal("USB input/output error - likely disconnect")
break
def __handle_sigterm(self, sig, frame):
self.die_gracefully()
def die_gracefully(self, sink_creation_fail=False, trigger=None):
"""Kill the process and remove the VACs
on fatal exceptions or SIGTERM / SIGINT
"""
self.log.info('Cleanup on shutdown')
os.system(f"pactl set-default-sink {self.system_default_sink}")
# cleanup virtual sinks if they exist
if sink_creation_fail == False:
self.log.info("Destroying virtual sinks...")
os.system("pw-cli destroy Arctis_Game 1>/dev/null")
os.system("pw-cli destroy Arctis_Chat 1>/dev/null")
if trigger is not None:
self.log.info("-"*45)
self.log.fatal("Failure reason: " + trigger)
self.log.info("-"*45)
sys.exit(1)
else:
self.log.info("-"*45)
self.log.info("Artcis 7+ ChatMix shut down gracefully... Bye Bye!")
self.log.info("-"*45)
sys.exit(0)
# init
if __name__ == '__main__':
a7pcm_service = Arctis7PlusChatMix()
a7pcm_service.start_modulator_signal()