rtc_handle
RTミドルウェア(OpenRTM-aist,RTM)のRTコンポーネント(RTC)をユーザのpython環境から自由に操作するための 環境を提供するツールです.とりあえずpython2, python3の両方で動くはずです.
ghithubサイト(ときどき手を入れています):rtmtools
使用方法は,githubのREADME.mdおよびrtc_handle_example/scriptにあるipnbおよびそこから作成したhtmlを見て下さい.
現在,基本的な使い方としてbasic.htmlが置いてあります.今後も拡充する予定です.
初版のマニュアル類は以下にあります.細かい点で異なる部分もありますが基本的な使い方は同じなので参考にして下さい.
初版のマニュアル類:(RTミドルウェアコンテスト2008,日本ロボット工業会賞受賞)
以下はipnbで生成したものをdokuwikiフォーマットに変換したものです.
rtc_handle.py(basic)
- this ipnb shows a basic usage of rtc_handle.py
- precondition: rtcs(cin and cout) are prelaunched separetely
- you can monitor the behavior of the system with openrtp
you can access and control rtcs of OpenRTM-aist (written in any languages, ie., c++, python, java) by using rtc_handle.py
#!/usr/bin/env python # -*- Python -*- import sys import time import subprocess
setup user environmet
- path for rtc_handle
- path for rtcs and user tools
- nameservers
you may provide a file (ex. set_env.py) for this.
# # set up user environment # RtmToolsDir, MyRtcDir, etc. # # from set_env import * : you may provide a setup file like this # RtmToolsDir="../.." MyRtcDir=".." NS0="localhost:9876"
import user tools
path is modified temporaly to import tools
stubs for rtc service ports might be imported (this will be explained another example).
# # import user tools # sys.path.append(".") save_path = sys.path[:] sys.path.append(RtmToolsDir+'/rtc_handle') from rtc_handle import * # from rtc_handle_util import * # sys.path.append(RtmToolsDir+'/embryonic_rtc') # from EmbryonicRtc import * sys.path = save_path # # import stub files # #import _GlobalIDL
RtmEnv: rtm environment holder
RtmEnv class object contains an orb, name-servers, rtcs, connectors and other rtm environment information.
the second arg is a list of cos nameservers.
# # user program # # env = RtmEnv(sys.argv,[NS0])
NameSpace
NameSpace.env.list_obj() retrieves a list of corba objects in the nameserver and put them into the NameSpace.obj_list dictionary.
if an object is an rtc, its proxy object(RtcHandl) is created and put into the NameSpace.rtc_handle dictionary.
env.name_space[NS0].list_obj()
objcet cin0.rtc was listed. port_name: str_out handle for cin0.rtc was created. objcet cout0.rtc was listed. port_name: str_in handle for cout0.rtc was created. [['cin0.rtc', <OpenRTM._objref_DataFlowComponent at 0x7fdbdc4fc970>], ['cout0.rtc', <OpenRTM._objref_DataFlowComponent at 0x7fdbcde64be0>]]
env.name_space[NS0].obj_list
{'cin0.rtc': <OpenRTM._objref_DataFlowComponent at 0x7fdbdc4fc970>,
'cout0.rtc': <OpenRTM._objref_DataFlowComponent at 0x7fdbcde64be0>}
env.name_space[NS0].rtc_handles
{'cin0.rtc': <rtc_handle.RtcHandle at 0x7fdbdc50e3a0>,
'cout0.rtc': <rtc_handle.RtcHandle at 0x7fdbcde662e0>}
RtcHandle: proxy of rtc
RtcHandle is a proxy class of rtc and the center object of this module. an rtc can be accessed by its object reference and information of the rtc can be gathered throug the reference. an RtcHandle object holds those information and proxy the rtc.
assign rtc proxies to valiables
to ease access to rtc proxies, it may be a good idea assigning them to valiables.
cin=env.name_space[NS0].rtc_handles['cin0.rtc'] cout=env.name_space[NS0].rtc_handles['cout0.rtc']
activate and deactivate rtcs
cout.activate()
RTC_OK
cout.deactivate()
RTC_OK
cout.activate()
RTC_OK
deactivation of some rtcs may fail. those rtcs may wait for resorces(ex. waiting for user input) in onExecute loop. for example,
cin.activate()
RTC_OK
cin.deactivate()
RTC_ERROR
but it usually recovers after the resorce is available. please input something at the cin console. then,
cin.activate()
RTC_OK
direct access to Inports and Outports
if the interface_type of the ports is corba_cdr, you can put data to inports and get data from outpors by using rtc_handle.py.
put data to inport
cout.inports
{'str_in': <rtc_handle.RtcInport at 0x7fdbcde6f6a0>}
cout.inports['str_in'].open()
RTC_OK
cout.inports['str_in'].write('abc')
PORT_OK
cout.inports['str_in'].close()
RTC_OK
get data from outport
by connecting to outport with setting ‘datapot.dataflow_type’ : ‘pull’ , you can get ‘dataport.corba_cdr.outport_ref’.
you can directly get the last data (and if not consumed other rtcs) by the ref.
cin.outports
{'str_out': <rtc_handle.RtcOutport at 0x7fdbcde66220>}
cin.outports['str_out'].open()
RTC_OK
cin.outports['str_out'].con.prop_dict
{'dataport.dataflow_type': 'pull',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '1',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString',
'dataport.serializer.cdr.endian': 'little,big',
'dataport.corba_cdr.outport_ior': 'IOR:010000001b00000049444c3a4f70656e52544d2f4f7574506f72744364723a312e300000010000000000000064000000010102000e0000003139322e3136382e35302e33350063cd0e000000fe37b1886100008405000000000a00000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
'dataport.corba_cdr.outport_ref': <OpenRTM._objref_OutPortCdr at 0x7fdbcce03550>}
cin.outports['str_out'].read()
cin.outports['str_out'].close()
RTC_OK
IOConnector : connect and disconnect io-ports
IOConnector contains information for connecting io-ports
create a connector between cin.outports[‘str_out’] and cout.inports[‘str_in’]
con = IOConnector([cin.outports['str_out'], cout.inports['str_in']])
default properties of the connector is as follows
con.def_prop
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '1',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing'}
connect ports
con.connect()
RTC_OK
con.profile
RTC.ConnectorProfile(name='cin0.str_out_cout0.str_in', connector_id='83a8f542-408b-11ec-b42d-594718fca3b7', ports=[<RTC._objref_PortService object at 0x7fdbcde71d90>, <RTC._objref_PortService object at 0x7fdbcde71ca0>], properties=[SDOPackage.NameValue(name='dataport.dataflow_type', value=CORBA.Any(CORBA.TC_string, 'push')), SDOPackage.NameValue(name='dataport.interface_type', value=CORBA.Any(CORBA.TC_string, 'corba_cdr')), SDOPackage.NameValue(name='dataport.subscription_type', value=CORBA.Any(CORBA.TC_string, 'new')), SDOPackage.NameValue(name='dataport.publisher.push_policy', value=CORBA.Any(CORBA.TC_string, 'new')), SDOPackage.NameValue(name='dataport.inport.buffer.length', value=CORBA.Any(CORBA.TC_string, '1')), SDOPackage.NameValue(name='dataport.inport.buffer.read.empty_policy', value=CORBA.Any(CORBA.TC_string, 'do_nothing')), SDOPackage.NameValue(name='dataport.inport.buffer.write.full_policy', value=CORBA.Any(CORBA.TC_string, 'overwrite')), SDOPackage.NameValue(name='dataport.outport.buffer.length', value=CORBA.Any(CORBA.TC_string, '1')), SDOPackage.NameValue(name='dataport.outport.buffer.write.full_policy', value=CORBA.Any(CORBA.TC_string, 'overwrite')), SDOPackage.NameValue(name='dataport.outport.buffer.read.empty_policy', value=CORBA.Any(CORBA.TC_string, 'do_nothing')), SDOPackage.NameValue(name='dataport.data_type', value=CORBA.Any(CORBA.TC_string, 'TimedString')), SDOPackage.NameValue(name='dataport.serializer.cdr.endian', value=CORBA.Any(CORBA.TC_string, 'little,big')), SDOPackage.NameValue(name='dataport.corba_cdr.inport_ior', value=CORBA.Any(CORBA.TC_string, 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001900000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100')), SDOPackage.NameValue(name='dataport.corba_cdr.inport_ref', value=CORBA.Any(CORBA.TypeCode("IDL:OpenRTM/InPortCdr:1.0"), <OpenRTM._objref_InPortCdr object at 0x7fdbcce08910>))])
con.prop_dict
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '1',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString',
'dataport.serializer.cdr.endian': 'little,big',
'dataport.corba_cdr.inport_ior': 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001900000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
'dataport.corba_cdr.inport_ref': <OpenRTM._objref_InPortCdr at 0x7fdbcce08910>}
disconnect ports
con.disconnect()
RTC_OK
change properties
you can change properties by giving prop_dict
con = IOConnector([cin.outports['str_out'], cout.inports['str_in']], prop_dict={'dataport.inport.buffer.length': '8'})
con.prop_dict_req
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '8',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString'}
con.connect()
RTC_OK
con.prop_dict
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '8',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString',
'dataport.serializer.cdr.endian': 'little,big',
'dataport.corba_cdr.inport_ior': 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001a00000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
'dataport.corba_cdr.inport_ref': <OpenRTM._objref_InPortCdr at 0x7fdbcce08f70>}
con.disconnect()
RTC_OK
conflict of connections
only one connection is permitted between the same ports.
so, if another connection exists, you can not control the connection by your connector.
for example,
b = IOConnector([cin.outports['str_out'], cout.inports['str_in']])
b.connect()
RTC_OK
con.connect()
there exists another connection. please try force=True. 'NO'
con.disconnect()
there exists another connection. please try force=True. 'NO'
you can handle this situation by forcing connect/disconnect operation
con.connect(force=True)
RTC_OK