mirror of
				https://github.com/openvswitch/ovs
				synced 2025-10-25 15:07:05 +00:00 
			
		
		
		
	Allow subclasses of Idl to define a notification hook
It is useful to make the notification events that Idl processes accessible to users of the library. This will make it possible to keep external systems in sync, but does not impose any particular notification pattern. The Row.from_json() call is added to be able to convert the 'old' JSON response on an update to a Row object to make it easy for users of notify() to see what changed, though this usage of Row is quite different than Idl's typical use. Signed-off-by: Terry Wilson <twilson@redhat.com> Signed-off-by: Ben Pfaff <blp@nicira.com>
This commit is contained in:
		| @@ -26,8 +26,12 @@ vlog = ovs.vlog.Vlog("idl") | |||||||
|  |  | ||||||
| __pychecker__ = 'no-classattr no-objattrs' | __pychecker__ = 'no-classattr no-objattrs' | ||||||
|  |  | ||||||
|  | ROW_CREATE = "create" | ||||||
|  | ROW_UPDATE = "update" | ||||||
|  | ROW_DELETE = "delete" | ||||||
|  |  | ||||||
| class Idl: |  | ||||||
|  | class Idl(object): | ||||||
|     """Open vSwitch Database Interface Definition Language (OVSDB IDL). |     """Open vSwitch Database Interface Definition Language (OVSDB IDL). | ||||||
|  |  | ||||||
|     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC |     The OVSDB IDL maintains an in-memory replica of a database.  It issues RPC | ||||||
| @@ -264,6 +268,17 @@ class Idl: | |||||||
|             self.lock_name = lock_name |             self.lock_name = lock_name | ||||||
|             self.__send_lock_request() |             self.__send_lock_request() | ||||||
|  |  | ||||||
|  |     def notify(self, event, row, updates=None): | ||||||
|  |         """Hook for implementing create/update/delete notifications | ||||||
|  |  | ||||||
|  |         :param event:   The event that was triggered | ||||||
|  |         :type event:    ROW_CREATE, ROW_UPDATE, or ROW_DELETE | ||||||
|  |         :param row:     The row as it is after the operation has occured | ||||||
|  |         :type row:      Row | ||||||
|  |         :param updates: For updates, a Row object with just the changed columns | ||||||
|  |         :type updates:  Row | ||||||
|  |         """ | ||||||
|  |  | ||||||
|     def __clear(self): |     def __clear(self): | ||||||
|         changed = False |         changed = False | ||||||
|  |  | ||||||
| @@ -386,6 +401,7 @@ class Idl: | |||||||
|             if row: |             if row: | ||||||
|                 del table.rows[uuid] |                 del table.rows[uuid] | ||||||
|                 changed = True |                 changed = True | ||||||
|  |                 self.notify(ROW_DELETE, row) | ||||||
|             else: |             else: | ||||||
|                 # XXX rate-limit |                 # XXX rate-limit | ||||||
|                 vlog.warn("cannot delete missing row %s from table %s" |                 vlog.warn("cannot delete missing row %s from table %s" | ||||||
| @@ -401,15 +417,19 @@ class Idl: | |||||||
|                           % (uuid, table.name)) |                           % (uuid, table.name)) | ||||||
|             if self.__row_update(table, row, new): |             if self.__row_update(table, row, new): | ||||||
|                 changed = True |                 changed = True | ||||||
|  |                 self.notify(ROW_CREATE, row) | ||||||
|         else: |         else: | ||||||
|  |             op = ROW_UPDATE | ||||||
|             if not row: |             if not row: | ||||||
|                 row = self.__create_row(table, uuid) |                 row = self.__create_row(table, uuid) | ||||||
|                 changed = True |                 changed = True | ||||||
|  |                 op = ROW_CREATE | ||||||
|                 # XXX rate-limit |                 # XXX rate-limit | ||||||
|                 vlog.warn("cannot modify missing row %s in table %s" |                 vlog.warn("cannot modify missing row %s in table %s" | ||||||
|                           % (uuid, table.name)) |                           % (uuid, table.name)) | ||||||
|             if self.__row_update(table, row, new): |             if self.__row_update(table, row, new): | ||||||
|                 changed = True |                 changed = True | ||||||
|  |                 self.notify(op, row, Row.from_json(self, table, uuid, old)) | ||||||
|         return changed |         return changed | ||||||
|  |  | ||||||
|     def __row_update(self, table, row, row_json): |     def __row_update(self, table, row, row_json): | ||||||
| @@ -570,6 +590,26 @@ class Row(object): | |||||||
|             return |             return | ||||||
|         self._idl.txn._write(self, column, datum) |         self._idl.txn._write(self, column, datum) | ||||||
|  |  | ||||||
|  |     @classmethod | ||||||
|  |     def from_json(cls, idl, table, uuid, row_json): | ||||||
|  |         data = {} | ||||||
|  |         for column_name, datum_json in row_json.iteritems(): | ||||||
|  |             column = table.columns.get(column_name) | ||||||
|  |             if not column: | ||||||
|  |                 # XXX rate-limit | ||||||
|  |                 vlog.warn("unknown column %s in table %s" | ||||||
|  |                           % (column_name, table.name)) | ||||||
|  |                 continue | ||||||
|  |             try: | ||||||
|  |                 datum = ovs.db.data.Datum.from_json(column.type, datum_json) | ||||||
|  |             except error.Error, e: | ||||||
|  |                 # XXX rate-limit | ||||||
|  |                 vlog.warn("error parsing column %s in table %s: %s" | ||||||
|  |                           % (column_name, table.name, e)) | ||||||
|  |                 continue | ||||||
|  |             data[column_name] = datum | ||||||
|  |         return cls(idl, table, uuid, data) | ||||||
|  |  | ||||||
|     def verify(self, column_name): |     def verify(self, column_name): | ||||||
|         """Causes the original contents of column 'column_name' in this row to |         """Causes the original contents of column 'column_name' in this row to | ||||||
|         be verified as a prerequisite to completing the transaction.  That is, |         be verified as a prerequisite to completing the transaction.  That is, | ||||||
|   | |||||||
| @@ -497,6 +497,23 @@ OVSDB_CHECK_IDL_PY([getattr idl, insert ops], | |||||||
| 003: done | 003: done | ||||||
| ]]) | ]]) | ||||||
|  |  | ||||||
|  | OVSDB_CHECK_IDL_PY([row-from-json idl, whats this], | ||||||
|  |   [['["idltest", | ||||||
|  |       {"op": "insert", | ||||||
|  |        "table": "simple", | ||||||
|  |        "row": {"i": 1}}, | ||||||
|  |       {"op": "insert", | ||||||
|  |        "table": "simple", | ||||||
|  |        "row": {}}]']], | ||||||
|  |   [['notifytest insert 2, notifytest set 1 b 1, notifytest delete 0']], | ||||||
|  |   [[000: i=0 r=0 b=false s= u=<0> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1> | ||||||
|  | 000: i=1 r=0 b=false s= u=<0> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<2> | ||||||
|  | 001: commit, status=success, events=create|2|None, delete|0|None, update|1|b | ||||||
|  | 002: i=1 r=0 b=true s= u=<0> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<2> | ||||||
|  | 002: i=2 r=0 b=false s= u=<0> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3> | ||||||
|  | 003: done | ||||||
|  | ]]) | ||||||
|  |  | ||||||
| AT_SETUP([idl handling of missing tables and columns - C]) | AT_SETUP([idl handling of missing tables and columns - C]) | ||||||
| AT_KEYWORDS([ovsdb server idl positive]) | AT_KEYWORDS([ovsdb server idl positive]) | ||||||
| OVS_RUNDIR=`pwd`; export OVS_RUNDIR | OVS_RUNDIR=`pwd`; export OVS_RUNDIR | ||||||
|   | |||||||
| @@ -228,11 +228,24 @@ def idltest_find_simple(idl, i): | |||||||
| def idl_set(idl, commands, step): | def idl_set(idl, commands, step): | ||||||
|     txn = ovs.db.idl.Transaction(idl) |     txn = ovs.db.idl.Transaction(idl) | ||||||
|     increment = False |     increment = False | ||||||
|  |     events = [] | ||||||
|     for command in commands.split(','): |     for command in commands.split(','): | ||||||
|         words = command.split() |         words = command.split() | ||||||
|         name = words[0] |         name = words[0] | ||||||
|         args = words[1:] |         args = words[1:] | ||||||
|  |  | ||||||
|  |         if name == "notifytest": | ||||||
|  |             name = args[0] | ||||||
|  |             args = args[1:] | ||||||
|  |             old_notify = idl.notify | ||||||
|  |  | ||||||
|  |             def notify(event, row, updates=None): | ||||||
|  |                 upcol = updates._data.keys()[0] if updates else None | ||||||
|  |                 events.append("%s|%s|%s" % (event, row.i, upcol)) | ||||||
|  |                 idl.notify = old_notify | ||||||
|  |  | ||||||
|  |             idl.notify = notify | ||||||
|  |  | ||||||
|         if name == "set": |         if name == "set": | ||||||
|             if len(args) != 3: |             if len(args) != 3: | ||||||
|                 sys.stderr.write('"set" command requires 3 arguments\n') |                 sys.stderr.write('"set" command requires 3 arguments\n') | ||||||
| @@ -338,6 +351,10 @@ def idl_set(idl, commands, step): | |||||||
|                      % (step, ovs.db.idl.Transaction.status_to_string(status))) |                      % (step, ovs.db.idl.Transaction.status_to_string(status))) | ||||||
|     if increment and status == ovs.db.idl.Transaction.SUCCESS: |     if increment and status == ovs.db.idl.Transaction.SUCCESS: | ||||||
|         sys.stdout.write(", increment=%d" % txn.get_increment_new_value()) |         sys.stdout.write(", increment=%d" % txn.get_increment_new_value()) | ||||||
|  |     if events: | ||||||
|  |         # Event notifications from operations in a single transaction are | ||||||
|  |         # not in a gauranteed order due to update messages being dicts | ||||||
|  |         sys.stdout.write(", events=" + ", ".join(sorted(events))) | ||||||
|     sys.stdout.write("\n") |     sys.stdout.write("\n") | ||||||
|     sys.stdout.flush() |     sys.stdout.flush() | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user