Install using

python3 -m pip install openiap

Please see Getting Started on how to set up and run example code.

Quick Start Example

In a folder called .vscode, add launch.json.

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Run main.py",
            "type": "python",
            "request": "launch",
            "program": "main.py",
            "console": "integratedTerminal",
            "env": {
                "apiurl": "grpc://username:password@grpc.app.openiap.io:443",
            }
        }
    ]
}

Next, add main.py to the root folder.

import asyncio
from openiap import Client
async def main():
    client = openiap.Client()
    client.Signin()
    result = await client.Query(collectionname="entities", projection={"_created": 1, "name": 1, "_type": 1})
    print(result)
    client.Close()
asyncio.run(main())

Now press F5 or click Run, to run the example. We HIGHLY discourage the use of username and password. Please see Getting Started on how to generate JWT tokens and to easily swap between multiple users and OpenFlow instances.

RegisterQueue

To register and consume a message queue:

import json
import asyncio
from openiap import Client

async def main():
    client = Client()
    await client.Signin()
    queuename = await client.RegisterQueue(queuename="myqueue", callback=callback_function)
    print(f"Consuming queue {queuename}")
    while True:
        await asyncio.sleep(1)

async def callback_function(cli:Client, message, payload):
    print(json.dumps(payload, indent=2))

if __name__ == "__main__":
    asyncio.run(main())

This will give issues if we disconnect from the server. When the client reconnects, we will no longer be consuming the queue, so let’s update the logic to handle registering the queue after reconnection.

import openiap, asyncio
from openiap import Client
import json
async def callback_function(cli:Client, message, payload):
    print(json.dumps(payload, indent=2))
async def onconnected(cli:Client):
    try:
        await cli.Signin()
        print("Connected to OpenIAP") 
        queuename = await cli.RegisterQueue("myqueue", callback_function)
        print(f"Consuming queue {queuename}")
    except Exception as e:
        print(e)
async def main():
    client = openiap.Client()
    client.onconnected = onconnected
    while True:
        await asyncio.sleep(1)
if __name__ == "__main__":
    asyncio.run(main())

This way, we sign in and re-consume the queue every time the client reconnects to the server.

openiap module

Classes ——-

Client(url: str = '', grpc_max_receive_message_length: int = 4194304)

Methods

Aggregate(self, aggregates: dict = {}, collectionname: str = 'entities', queryas: str = None) :

Close(self) :

Count(self, query: dict = {}, collectionname: str = 'entities', queryas: str = None) :

DeleteMany(self, query: dict, collectionname: str = 'entities', recursive: bool = False) :

DeleteOne(self, id: str, collectionname: str = 'entities', recursive: bool = False) :

DownloadFile(self, Id: str = None, Filename: str = None) :

DropCollection(self, collectionname: str) :

GetDocumentVersion(self, id: str, collectionname: str, version: int = 0, decrypt: bool = True) :

GetElement(self, xpath: str) :

InsertMany(self, items: [], collectionname: str = 'entities', skipresults: bool = False) :

InsertOne(self, item: dict, collectionname: str = 'entities') :

InsertOrUpdateMany(self, items: dict, collectionname: str = 'entities', uniqeness: str = '_id', skipresults: bool = False) :

InsertOrUpdateOne(self, item: dict, collectionname: str = 'entities', uniqeness: str = '_id') :

ListCollections(self, includehist: bool = False) :

PopWorkitem(self, wiq: str, includefiles: bool = False, compressed: bool = False) :

PushWorkitem(self, wiq: str, name: str, payload: dict, files: <built-in function any> = None, wiqid: str = None, nextrun: datetime.datetime = None, priority: int = 2, compressed: bool = False) :

Query(self, collectionname: str = 'entities', query: dict = {}, projection: dict = {}, top: int = 100, skip: int = 0, orderby=None, queryas: str = None) :

QueueMessage(self, queuename: str, payload, correlationId=None, striptoken=True, rpc=False) :

RegisterQueue(self, queuename: str, callback) :

Signin(self, username: str = None, password: str = None, ping: bool = True, validateonly: bool = False, longtoken: bool = False) :

UnWatch(self, id: str) :

UpdateDocument(self, query: dict, document: dict, collectionname: str = 'entities') :

UpdateOne(self, item: dict, collectionname: str = 'entities') :

UpdateWorkitem(self, workitem, files: <built-in function any> = None, compressed: bool = False, ignoremaxretries: bool = False) :

Watch(self, collectionname, paths: list, callback) :

ainput(self, string: str) ‑> str :

onconnected(self, client) :

onmessage(self, client, command, rid, message) :

GracefulKiller()

Class variables

kill_now :

Methods

exit_gracefully(self, *args) :