====== rtc_handle ======
RTミドルウェア(OpenRTM-aist,RTM)のRTコンポーネント(RTC)をユーザのpython環境から自由に操作するための
環境を提供するツールです.とりあえずpython2, python3の両方で動くはずです.
**ghithubサイト(ときどき手を入れています):**[[https://github.com/takashi-suehiro/rtmtools|rtmtools]]
使用方法は,githubのREADME.mdおよびrtc_handle_example/scriptにあるipnbおよびそこから作成したhtmlを見て下さい.\\
現在,基本的な使い方としてbasic.htmlが置いてあります.今後も拡充する予定です.
初版のマニュアル類は以下にあります.細かい点で異なる部分もありますが基本的な使い方は同じなので参考にして下さい.
**初版のマニュアル類:**(RTミドルウェアコンテスト2008,日本ロボット工業会賞受賞)
* [[https://www.openrtm.org/rt/RTMcontest/2008/doc/1L4-5.pdf|概要]]
* [[https://www.openrtm.org/rt/RTMcontest/2008/doc/1L4-5/rtc_handle_2.pdf|プレゼン資料]]
* [[https://www.openrtm.org/rt/RTMcontest/2008/doc/1L4-5/RtcHandle_manual.pdf|初版マニュアル]]
以下はipnbで生成したものをdokuwikiフォーマットに変換したものです.
----
===== rtc_handle.py(basic) =====
* this ipnb shows a basic usage of rtc_handle.py
* precondition: rtcs(cin and cout) are prelaunched separetely
* you can monitor the behavior of the system with openrtp
you can access and control rtcs of OpenRTM-aist (written in any languages, ie., c++, python, java) by using rtc_handle.py
#!/usr/bin/env python
# -*- Python -*-
import sys
import time
import subprocess
==== setup user environmet ====
* path for rtc_handle
* path for rtcs and user tools
* nameservers
you may provide a file (ex. set_env.py) for this.
#
# set up user environment
# RtmToolsDir, MyRtcDir, etc.
#
# from set_env import * : you may provide a setup file like this
#
RtmToolsDir="../.."
MyRtcDir=".."
NS0="localhost:9876"
==== import user tools ====
path is modified temporaly to import tools
stubs for rtc service ports might be imported (this will be explained another example).
#
# import user tools
#
sys.path.append(".")
save_path = sys.path[:]
sys.path.append(RtmToolsDir+'/rtc_handle')
from rtc_handle import *
# from rtc_handle_util import *
# sys.path.append(RtmToolsDir+'/embryonic_rtc')
# from EmbryonicRtc import *
sys.path = save_path
#
# import stub files
#
#import _GlobalIDL
==== RtmEnv: rtm environment holder ====
RtmEnv class object contains an orb, name-servers, rtcs, connectors and other rtm environment information.
the second arg is a list of cos nameservers.
#
# user program
#
#
env = RtmEnv(sys.argv,[NS0])
==== NameSpace ====
NameSpace.env.list_obj() retrieves a list of corba objects in the nameserver and put them into the NameSpace.obj_list dictionary.
if an object is an rtc, its proxy object(RtcHandl) is created and put into the NameSpace.rtc_handle dictionary.
env.name_space[NS0].list_obj()
objcet cin0.rtc was listed.
port_name: str_out
handle for cin0.rtc was created.
objcet cout0.rtc was listed.
port_name: str_in
handle for cout0.rtc was created.
[['cin0.rtc', ],
['cout0.rtc', ]]
env.name_space[NS0].obj_list
{'cin0.rtc': ,
'cout0.rtc': }
env.name_space[NS0].rtc_handles
{'cin0.rtc': ,
'cout0.rtc': }
==== RtcHandle: proxy of rtc ====
RtcHandle is a proxy class of rtc and the center object of this module. an rtc can be accessed by its object reference and information of the rtc can be gathered throug the reference. an RtcHandle object holds those information and proxy the rtc.
=== assign rtc proxies to valiables ===
to ease access to rtc proxies, it may be a good idea assigning them to valiables.
cin=env.name_space[NS0].rtc_handles['cin0.rtc']
cout=env.name_space[NS0].rtc_handles['cout0.rtc']
=== activate and deactivate rtcs ===
cout.activate()
RTC_OK
cout.deactivate()
RTC_OK
cout.activate()
RTC_OK
deactivation of some rtcs may fail. those rtcs may wait for resorces(ex. waiting for user input) in onExecute loop. for example,
cin.activate()
RTC_OK
cin.deactivate()
RTC_ERROR
but it usually recovers after the resorce is available. please input something at the cin console. then,
cin.activate()
RTC_OK
=== direct access to Inports and Outports ===
if the interface_type of the ports is corba_cdr, you can put data to inports and get data from outpors by using rtc_handle.py.
== put data to inport ==
cout.inports
{'str_in': }
cout.inports['str_in'].open()
RTC_OK
cout.inports['str_in'].write('abc')
PORT_OK
cout.inports['str_in'].close()
RTC_OK
== get data from outport ==
by connecting to outport with setting ‘datapot.dataflow_type’ : ‘pull’ , you can get ‘dataport.corba_cdr.outport_ref’.
you can directly get the last data (and if not consumed other rtcs) by the ref.
cin.outports
{'str_out': }
cin.outports['str_out'].open()
RTC_OK
cin.outports['str_out'].con.prop_dict
{'dataport.dataflow_type': 'pull',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '1',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString',
'dataport.serializer.cdr.endian': 'little,big',
'dataport.corba_cdr.outport_ior': 'IOR:010000001b00000049444c3a4f70656e52544d2f4f7574506f72744364723a312e300000010000000000000064000000010102000e0000003139322e3136382e35302e33350063cd0e000000fe37b1886100008405000000000a00000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
'dataport.corba_cdr.outport_ref': }
cin.outports['str_out'].read()
cin.outports['str_out'].close()
RTC_OK
==== IOConnector : connect and disconnect io-ports ====
IOConnector contains information for connecting io-ports
create a connector between cin.outports[‘str_out’] and cout.inports[‘str_in’]
con = IOConnector([cin.outports['str_out'], cout.inports['str_in']])
default properties of the connector is as follows
con.def_prop
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '1',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing'}
connect ports
con.connect()
RTC_OK
con.profile
RTC.ConnectorProfile(name='cin0.str_out_cout0.str_in', connector_id='83a8f542-408b-11ec-b42d-594718fca3b7', ports=[, ], properties=[SDOPackage.NameValue(name='dataport.dataflow_type', value=CORBA.Any(CORBA.TC_string, 'push')), SDOPackage.NameValue(name='dataport.interface_type', value=CORBA.Any(CORBA.TC_string, 'corba_cdr')), SDOPackage.NameValue(name='dataport.subscription_type', value=CORBA.Any(CORBA.TC_string, 'new')), SDOPackage.NameValue(name='dataport.publisher.push_policy', value=CORBA.Any(CORBA.TC_string, 'new')), SDOPackage.NameValue(name='dataport.inport.buffer.length', value=CORBA.Any(CORBA.TC_string, '1')), SDOPackage.NameValue(name='dataport.inport.buffer.read.empty_policy', value=CORBA.Any(CORBA.TC_string, 'do_nothing')), SDOPackage.NameValue(name='dataport.inport.buffer.write.full_policy', value=CORBA.Any(CORBA.TC_string, 'overwrite')), SDOPackage.NameValue(name='dataport.outport.buffer.length', value=CORBA.Any(CORBA.TC_string, '1')), SDOPackage.NameValue(name='dataport.outport.buffer.write.full_policy', value=CORBA.Any(CORBA.TC_string, 'overwrite')), SDOPackage.NameValue(name='dataport.outport.buffer.read.empty_policy', value=CORBA.Any(CORBA.TC_string, 'do_nothing')), SDOPackage.NameValue(name='dataport.data_type', value=CORBA.Any(CORBA.TC_string, 'TimedString')), SDOPackage.NameValue(name='dataport.serializer.cdr.endian', value=CORBA.Any(CORBA.TC_string, 'little,big')), SDOPackage.NameValue(name='dataport.corba_cdr.inport_ior', value=CORBA.Any(CORBA.TC_string, 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001900000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100')), SDOPackage.NameValue(name='dataport.corba_cdr.inport_ref', value=CORBA.Any(CORBA.TypeCode("IDL:OpenRTM/InPortCdr:1.0"), ))])
con.prop_dict
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '1',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString',
'dataport.serializer.cdr.endian': 'little,big',
'dataport.corba_cdr.inport_ior': 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001900000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
'dataport.corba_cdr.inport_ref': }
disconnect ports
con.disconnect()
RTC_OK
=== change properties ===
you can change properties by giving prop_dict
con = IOConnector([cin.outports['str_out'], cout.inports['str_in']],
prop_dict={'dataport.inport.buffer.length': '8'})
con.prop_dict_req
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '8',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString'}
con.connect()
RTC_OK
con.prop_dict
{'dataport.dataflow_type': 'push',
'dataport.interface_type': 'corba_cdr',
'dataport.subscription_type': 'new',
'dataport.publisher.push_policy': 'new',
'dataport.inport.buffer.length': '8',
'dataport.inport.buffer.read.empty_policy': 'do_nothing',
'dataport.inport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.length': '1',
'dataport.outport.buffer.write.full_policy': 'overwrite',
'dataport.outport.buffer.read.empty_policy': 'do_nothing',
'dataport.data_type': 'TimedString',
'dataport.serializer.cdr.endian': 'little,big',
'dataport.corba_cdr.inport_ior': 'IOR:010000001a00000049444c3a4f70656e52544d2f496e506f72744364723a312e30000000010000000000000064000000010102000e0000003139322e3136382e35302e33350041cb0e000000fe37b1886100008404000000001a00000200000000000000080000000100000000545441010000001c00000001000000010001000100000001000105090101000100000009010100',
'dataport.corba_cdr.inport_ref': }
con.disconnect()
RTC_OK
=== conflict of connections ===
only one connection is permitted between the same ports.
so, if another connection exists, you can not control the connection by your connector.
for example,
b = IOConnector([cin.outports['str_out'], cout.inports['str_in']])
b.connect()
RTC_OK
con.connect()
there exists another connection. please try force=True.
'NO'
con.disconnect()
there exists another connection. please try force=True.
'NO'
you can handle this situation by forcing connect/disconnect operation
con.connect(force=True)
RTC_OK