diff mbox series

[v2,08/10] auto-t: add proper AccessPoint object class

Message ID 20221102205327.268693-8-prestwoj@gmail.com (mailing list archive)
State Accepted, archived
Headers show
Series [v2,01/10] wiphy: add wiphy_get_supported_ciphers | expand

Checks

Context Check Description
tedd_an/pre-ci_am success Success
prestwoj/iwd-ci-gitlint success GitLint

Commit Message

James Prestwood Nov. 2, 2022, 8:53 p.m. UTC
The AP mode device APIs were hacked together and only able to start
stop an AP. Now that the AP interface has more functionality its
best to use the DBus class template to access the full AP interface
capabilities.
---
 autotests/util/iwd.py | 101 ++++++++++++++++++++++++++++++++++++++----
 1 file changed, 93 insertions(+), 8 deletions(-)
diff mbox series

Patch

diff --git a/autotests/util/iwd.py b/autotests/util/iwd.py
index 9e96382a..98b9ea1c 100755
--- a/autotests/util/iwd.py
+++ b/autotests/util/iwd.py
@@ -299,6 +299,69 @@  class DeviceProvisioning(IWDDBusAbstract):
     def role(self):
         return self._properties['Role']
 
+class AccessPointDevice(IWDDBusAbstract):
+    '''
+        Class represents net.connman.iwd.AccessPoint
+    '''
+    _iface_name = IWD_AP_INTERFACE
+
+    def start(self, ssid, psk):
+        self._iface.Start(ssid, psk, reply_handler=self._success,
+                                        error_handler=self._failure)
+        self._wait_for_async_op()
+
+        IWD._wait_for_object_condition(self, 'obj.started == True')
+
+    def start_profile(self, ssid):
+        self._iface.StartProfile(ssid, reply_handler=self._success,
+                                        error_handler=self._failure)
+        self._wait_for_async_op()
+
+        IWD._wait_for_object_condition(self, 'obj.started == True')
+
+    def stop(self):
+        self._iface.Stop(reply_handler=self._success,
+                            error_handler=self._failure)
+        self._wait_for_async_op()
+
+        IWD._wait_for_object_condition(self, 'obj.started == False')
+
+    def scan(self):
+        self._iface.Scan(reply_handler=self._success,
+                                        error_handler=self._failure)
+        self._wait_for_async_op()
+
+        IWD._wait_for_object_condition(self, 'obj.scanning == True')
+        IWD._wait_for_object_condition(self, 'obj.scanning == False')
+
+    def get_ordered_networks(self):
+        return self._iface.GetOrderedNetworks()
+
+    @property
+    def started(self):
+        return self._properties['Started']
+
+    @property
+    def name(self):
+        return self._properties['Name']
+
+    @property
+    def scanning(self):
+        return self._properties['Scanning']
+
+    @property
+    def frequency(self):
+        return self._properties['Frequency']
+
+    @property
+    def pairwise_ciphers(self):
+        return self._properties['PairwiseCiphers']
+
+    @property
+    def group_cipher(self):
+        return self._properties['GroupCipher']
+
+
 class Device(IWDDBusAbstract):
     '''
         Class represents a network device object: net.connman.iwd.Device
@@ -312,6 +375,7 @@  class Device(IWDDBusAbstract):
         self._station_props = None
         self._station_debug_obj = None
         self._dpp_obj = None
+        self._ap_obj = None
 
         IWDDBusAbstract.__init__(self, *args, **kwargs)
 
@@ -354,6 +418,17 @@  class Device(IWDDBusAbstract):
 
         return self._station_debug_obj
 
+    @property
+    def _ap(self):
+        if self._properties['Mode'] != 'ap':
+            self._prop_proxy.Set(IWD_DEVICE_INTERFACE, 'Mode', 'ap')
+
+        if self._ap_obj is None:
+            self._ap_obj = AccessPointDevice(object_path=self._object_path,
+                                                namespace=self._namespace)
+
+        return self._ap_obj
+
     def _station_properties(self):
         if self._station_props is not None:
             return self._station_props
@@ -605,20 +680,30 @@  class Device(IWDDBusAbstract):
         except Exception as e:
             raise _convert_dbus_ex(e)
 
-        self._ap_iface = dbus.Interface(self._bus.get_object(IWD_SERVICE,
-                                            self.device_path),
-                                            IWD_AP_INTERFACE)
         if psk:
-            self._ap_iface.Start(ssid, psk, reply_handler=self._success,
-                                    error_handler=self._failure)
+            self._ap.start(ssid, psk)
         else:
-            self._ap_iface.StartProfile(ssid, reply_handler=self._success,
-                                    error_handler=self._failure)
-        self._wait_for_async_op()
+            self._ap.start_profile(ssid)
 
     def stop_ap(self):
         self._prop_proxy.Set(IWD_DEVICE_INTERFACE, 'Mode', 'station')
 
+        IWD._wait_for_object_condition(self, "obj._properties['Mode'] == 'station'")
+
+    @property
+    def group_cipher(self):
+        if self._properties['Mode'] != 'ap':
+            raise Exception('group_cipher only supported in AP mode')
+
+        return self._ap.group_cipher
+
+    @property
+    def pairwise_ciphers(self):
+        if self._properties['Mode'] != 'ap':
+            raise Exception('pairwise_cipher only supported in AP mode')
+
+        return self._ap.pairwise_ciphers
+
     def connect_hidden_network(self, name):
         '''Connect to a hidden network
            Possible exception: BusyEx