Aiohttp WSDL Post

Pada artikel sebelumnya kita sudah membahas tentang aiohttp. Aiohttp mendukung action POST dan GET seperti halnya library requests.

pada SOAP ada 3 hal yang perlu di perhatikan pada saat mengirimkan method post ke server

  • url
  • body
  • headers

parsing xml dengan etree

login_envelope = cls.ev.Envelope(
            cls.s.Header(
                cls.h.ContractVersion('0')
            ),
            cls.s.Body(
                cls.l.LogonRequest(
                    cls.lg.logonRequestData(
                        cls.a.DomainCode('xxx'),
                        cls.a.AgentName('xxx'),
                        cls.a.Password('xxx'),
                        cls.a.LocationCode('',{cls.nill_type:'true'}),
                        cls.a.RoleCode('xxxxx'),
                        cls.a.TerminalInfo('',{cls.nill_type:'true'})
                    ),xmlns='http://schemas.navitaire.com/WebServices/ServiceContracts/SessionService'
                )
            )
 
        )

buat sebuah headers, gunakan soap ui untuk melakukan testing

headers = {'Accept': '*/*',
                            'Accept-Language': 'en-us',
                            'soapaction': 'http://schemas.navitaire.com/WebServices/ISessionManager/Logon',
                            'Content-Type': 'text/xml; charset=utf-8',
                            'Accept-Encoding': 'gzip, deflate',
                            'Connection': 'Keep-Alive',
                            'Pragma': 'no-cache'}

post data ke webservice dengan aiohttp

body = etree.tostring(login_envelope)
async def fetch(session, url,data,headers):
    with async_timeout.timeout(100):
        async with session.post(url,data=data,headers=headers) as response:
            return await response.text()
 
 
async def main(loop, url, data,headers):
    async with aiohttp.ClientSession(loop=loop) as session:
        etJson1 = await fetch(session, url, data, headers)
        return getJson1
 
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
data = loop.run_until_complete(main(loop, "https://xx.com", body,headers))
loop.close()
print(data)

 

 

 

mm

Alfin F

Technopreneur , Developer, Advisor

More Posts

Follow Me:
TwitterLinkedIn