2
0
mirror of https://github.com/openvswitch/ovs synced 2025-08-31 14:25:26 +00:00

classifier: Defer pvector publication.

This patch adds a new functions classifier_defer() and
classifier_publish(), which control when the classifier modifications
are made available to lookups.  By default, all modifications are made
available to lookups immediately.  Modifications made after a
classifier_defer() call MAY be 'deferred' for later 'publication'.  A
call to classifier_publish() will both publish any deferred
modifications, and cause subsequent changes to to be published
immediately.

Currently any deferring is limited to the visibility of the subtable
vector changes.  pvector now processes modifications mostly in a
working copy, which needs to be explicitly published with
pvector_publish().  pvector_publish() sorts the working copy and
removes gaps before publishing it.

This change helps avoiding O(n**2) memory behavior in corner cases,
where large number of rules with different masks are inserted or
deleted.

VMware-BZ: #1322017
Signed-off-by: Jarno Rajahalme <jrajahalme@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
This commit is contained in:
Jarno Rajahalme
2014-11-13 11:54:31 -08:00
parent d0999f1b34
commit 802f84ffd7
9 changed files with 199 additions and 125 deletions

View File

@@ -248,6 +248,7 @@ classifier_init(struct classifier *cls, const uint8_t *flow_segments)
for (int i = 0; i < CLS_MAX_TRIES; i++) {
trie_init(cls, i, NULL);
}
cls->publish = true;
}
/* Destroys 'cls'. Rules within 'cls', if any, are not freed; this is the
@@ -634,6 +635,11 @@ classifier_replace(struct classifier *cls, const struct cls_rule *rule)
/* Nothing was replaced. */
cls->n_rules++;
if (cls->publish) {
pvector_publish(&cls->subtables);
}
return NULL;
}
@@ -767,6 +773,11 @@ check_priority:
pvector_change_priority(&cls->subtables, subtable, max_priority);
}
}
if (cls->publish) {
pvector_publish(&cls->subtables);
}
free:
ovsrcu_postpone(free, cls_match);
cls->n_rules--;

View File

@@ -252,6 +252,7 @@ struct classifier {
struct cmap partitions; /* Contains "struct cls_partition"s. */
struct cls_trie tries[CLS_MAX_TRIES]; /* Prefix tries. */
unsigned int n_tries;
bool publish; /* Make changes visible to lookups? */
};
/* A rule to be inserted to the classifier. */
@@ -288,6 +289,8 @@ const struct cls_rule *classifier_replace(struct classifier *,
const struct cls_rule *);
const struct cls_rule *classifier_remove(struct classifier *,
const struct cls_rule *);
static inline void classifier_defer(struct classifier *);
static inline void classifier_publish(struct classifier *);
/* Lookups. These are RCU protected and may run concurrently with modifiers
* and each other. */
@@ -343,4 +346,17 @@ void cls_cursor_advance(struct cls_cursor *);
}
#endif
static inline void
classifier_defer(struct classifier *cls)
{
cls->publish = false;
}
static inline void
classifier_publish(struct classifier *cls)
{
cls->publish = true;
pvector_publish(&cls->subtables);
}
#endif /* classifier.h */

View File

@@ -3412,6 +3412,7 @@ dpcls_create_subtable(struct dpcls *cls, const struct netdev_flow_key *mask)
netdev_flow_key_clone(&subtable->mask, mask);
cmap_insert(&cls->subtables_map, &subtable->cmap_node, mask->hash);
pvector_insert(&cls->subtables, subtable, 0);
pvector_publish(&cls->subtables);
return subtable;
}
@@ -3454,6 +3455,7 @@ dpcls_remove(struct dpcls *cls, struct dpcls_rule *rule)
if (cmap_remove(&subtable->rules, &rule->cmap_node, rule->flow.hash)
== 0) {
dpcls_destroy_subtable(cls, subtable);
pvector_publish(&cls->subtables);
}
}

View File

@@ -265,15 +265,17 @@ ovs_router_flush(void)
{
struct ovs_router_entry *rt;
ovs_mutex_lock(&mutex);
classifier_defer(&cls);
CLS_FOR_EACH(rt, cr, &cls) {
if (rt->priority == rt->plen) {
ovs_mutex_lock(&mutex);
if (classifier_remove(&cls, &rt->cr)) {
ovsrcu_postpone(rt_entry_free, rt);
}
ovs_mutex_unlock(&mutex);
}
}
classifier_publish(&cls);
ovs_mutex_unlock(&mutex);
seq_change(tnl_conf_seq);
}

View File

@@ -38,7 +38,14 @@ pvector_impl_alloc(size_t size)
static struct pvector_impl *
pvector_impl_dup(struct pvector_impl *old)
{
return xmemdup(old, sizeof *old + old->allocated * sizeof old->vector[0]);
struct pvector_impl *impl;
size_t alloc = old->size + PVECTOR_EXTRA_ALLOC;
impl = xmalloc(sizeof *impl + alloc * sizeof impl->vector[0]);
impl->allocated = alloc;
impl->size = old->size;
memcpy(impl->vector, old->vector, old->size * sizeof old->vector[0]);
return impl;
}
/* Initializes 'pvec' as an empty concurrent priority vector. */
@@ -46,6 +53,7 @@ void
pvector_init(struct pvector *pvec)
{
ovsrcu_set(&pvec->impl, pvector_impl_alloc(PVECTOR_EXTRA_ALLOC));
pvec->temp = NULL;
}
/* Destroys 'pvec'.
@@ -55,6 +63,8 @@ pvector_init(struct pvector *pvec)
void
pvector_destroy(struct pvector *pvec)
{
free(pvec->temp);
pvec->temp = NULL;
ovsrcu_postpone(free, pvector_impl_get(pvec));
ovsrcu_set(&pvec->impl, NULL); /* Poison. */
}
@@ -81,23 +91,10 @@ static void
pvector_impl_sort(struct pvector_impl *impl)
{
qsort(impl->vector, impl->size, sizeof *impl->vector, pvector_entry_cmp);
}
/* Returns the index with priority equal or lower than 'target_priority',
* which will be one past the vector if none exists. */
static int
pvector_impl_find_priority(struct pvector_impl *impl,
int target_priority)
{
const struct pvector_entry *entry;
int index;
PVECTOR_IMPL_FOR_EACH (entry, index, impl) {
if (entry->priority <= target_priority) {
break;
}
/* Trim gaps. */
while (impl->size && impl->vector[impl->size - 1].priority == INT_MIN) {
impl->size = impl->size - 1;
}
return index;
}
/* Returns the index of the 'ptr' in the vector, or -1 if none is found. */
@@ -118,15 +115,13 @@ pvector_impl_find(struct pvector_impl *impl, void *target)
void
pvector_insert(struct pvector *pvec, void *ptr, int priority)
{
struct pvector_impl *old, *new;
int index;
struct pvector_impl *temp = pvec->temp;
struct pvector_impl *old = pvector_impl_get(pvec);
ovs_assert(ptr != NULL);
old = pvector_impl_get(pvec);
/* Check if can add to the end without reallocation. */
if (old->allocated > old->size &&
if (!temp && old->allocated > old->size &&
(!old->size || priority <= old->vector[old->size - 1].priority)) {
old->vector[old->size].ptr = ptr;
old->vector[old->size].priority = priority;
@@ -135,78 +130,80 @@ pvector_insert(struct pvector *pvec, void *ptr, int priority)
atomic_thread_fence(memory_order_release);
++old->size;
} else {
new = pvector_impl_alloc(old->size + 1 + PVECTOR_EXTRA_ALLOC);
index = pvector_impl_find_priority(old, priority);
/* Now at the insertion index. */
memcpy(new->vector, old->vector, index * sizeof old->vector[0]);
new->vector[index].ptr = ptr;
new->vector[index].priority = priority;
memcpy(&new->vector[index + 1], &old->vector[index],
(old->size - index) * sizeof old->vector[0]);
new->size = old->size + 1;
ovsrcu_set(&pvec->impl, new);
ovsrcu_postpone(free, old);
if (!temp) {
temp = pvector_impl_dup(old);
pvec->temp = temp;
} else if (temp->size == temp->allocated) {
temp = pvector_impl_dup(temp);
free(pvec->temp);
pvec->temp = temp;
}
/* Insert at the end, publish will sort. */
temp->vector[temp->size].ptr = ptr;
temp->vector[temp->size].priority = priority;
temp->size += 1;
}
}
void
pvector_remove(struct pvector *pvec, void *ptr)
{
struct pvector_impl *old, *new;
struct pvector_impl *temp = pvec->temp;
int index;
old = pvector_impl_get(pvec);
ovs_assert(old->size > 0);
index = pvector_impl_find(old, ptr);
if (!temp) {
temp = pvector_impl_dup(pvector_impl_get(pvec));
pvec->temp = temp;
}
ovs_assert(temp->size > 0);
index = pvector_impl_find(temp, ptr);
ovs_assert(index >= 0);
/* Now at the index of the entry to be deleted. */
/* We do not try to delete the last entry without reallocation so that
* the readers can read the 'size' once in the beginning of each iteration.
*/
/* Keep extra space for insertions to the end. */
new = pvector_impl_alloc(old->size - 1 + PVECTOR_EXTRA_ALLOC);
memcpy(new->vector, old->vector, index * sizeof old->vector[0]);
memcpy(&new->vector[index], &old->vector[index + 1],
(old->size - (index + 1)) * sizeof old->vector[0]);
new->size = old->size - 1;
ovsrcu_set(&pvec->impl, new);
ovsrcu_postpone(free, old);
/* Now at the index of the entry to be deleted.
* Clear in place, publish will sort and clean these off. */
temp->vector[index].ptr = NULL;
temp->vector[index].priority = INT_MIN;
}
/* Change entry's 'priority' and keep the vector ordered. */
void
pvector_change_priority(struct pvector *pvec, void *ptr, int priority)
{
struct pvector_impl *old = pvector_impl_get(pvec);
int index = pvector_impl_find(old, ptr);
struct pvector_impl *old = pvec->temp;
int index;
if (!old) {
old = pvector_impl_get(pvec);
}
index = pvector_impl_find(old, ptr);
ovs_assert(index >= 0);
/* Now at the index of the entry to be updated. */
/* Check if can not update in place. */
if ((priority > old->vector[index].priority && index > 0
&& priority > old->vector[index - 1].priority)
|| (priority < old->vector[index].priority && index < old->size - 1
&& priority < old->vector[index + 1].priority)) {
/* Have to reallocate to reorder. */
struct pvector_impl *new = pvector_impl_dup(old);
new->vector[index].priority = priority;
pvector_impl_sort(new);
ovsrcu_set(&pvec->impl, new);
ovsrcu_postpone(free, old);
} else {
/* Can update in place. Readers are free to use either value,
* so we do not try to synchronize here. */
old->vector[index].priority = priority;
/* Have to use a temp. */
if (!pvec->temp) {
/* Have to reallocate to reorder. */
pvec->temp = pvector_impl_dup(old);
old = pvec->temp;
/* Publish will sort. */
}
}
old->vector[index].priority = priority;
}
/* Make the modified pvector available for iteration. */
void pvector_publish__(struct pvector *pvec)
{
struct pvector_impl *temp = pvec->temp;
pvec->temp = NULL;
pvector_impl_sort(temp); /* Also removes gaps. */
ovsrcu_postpone(free, ovsrcu_get_protected(struct pvector_impl *,
&pvec->impl));
ovsrcu_set(&pvec->impl, temp);
}

View File

@@ -19,6 +19,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include "ovs-rcu.h"
#include "util.h"
@@ -47,6 +48,13 @@
* not change the ordering of the entries. Writers will never change the 'ptr'
* values, or decrement the 'size' on a copy that readers have access to.
*
* Most modifications are internally staged at the 'temp' vector, from which
* they can be published at 'impl' by calling pvector_publish(). This saves
* unnecessary memory allocations when many changes are done back-to-back.
* 'temp' may contain NULL pointers and it may be in unsorted order. It is
* sorted before it is published at 'impl', which also removes the NULLs from
* the published vector.
*
* Clients should not use priority INT_MIN.
*/
@@ -68,21 +76,25 @@ struct pvector_impl {
/* Concurrent priority vector. */
struct pvector {
OVSRCU_TYPE(struct pvector_impl *) impl;
struct pvector_impl *temp;
};
/* Initialization. */
void pvector_init(struct pvector *);
void pvector_destroy(struct pvector *);
/* Count. */
static inline size_t pvector_count(const struct pvector *);
static inline bool pvector_is_empty(const struct pvector *);
/* Insertion and deletion. */
/* Insertion and deletion. These work on 'temp'. */
void pvector_insert(struct pvector *, void *, int priority);
void pvector_change_priority(struct pvector *, void *, int priority);
void pvector_remove(struct pvector *, void *);
/* Make the modified pvector available for iteration. */
static inline void pvector_publish(struct pvector *);
/* Count. These operate on the published pvector. */
static inline size_t pvector_count(const struct pvector *);
static inline bool pvector_is_empty(const struct pvector *);
/* Iteration.
*
*
@@ -216,4 +228,14 @@ static inline bool pvector_is_empty(const struct pvector *pvec)
return pvector_count(pvec) == 0;
}
void pvector_publish__(struct pvector *);
/* Make the modified pvector available for iteration. */
static inline void pvector_publish(struct pvector *pvec)
{
if (pvec->temp) {
pvector_publish__(pvec);
}
}
#endif /* pvector.h */

View File

@@ -89,8 +89,6 @@ static void oftable_enable_eviction(struct oftable *,
size_t n_fields);
static void oftable_remove_rule(struct rule *rule) OVS_REQUIRES(ofproto_mutex);
static void oftable_remove_rule__(struct ofproto *, struct rule *)
OVS_REQUIRES(ofproto_mutex);
/* A set of rules within a single OpenFlow table (oftable) that have the same
* values for the oftable's eviction_fields. A rule to be evicted, when one is
@@ -237,6 +235,8 @@ struct ofport_usage {
/* rule. */
static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
static bool rule_is_readonly(const struct rule *);
static void ofproto_rule_remove__(struct ofproto *, struct rule *)
OVS_REQUIRES(ofproto_mutex);
/* The source of a flow_mod request, in the code that processes flow_mods.
*
@@ -1332,18 +1332,6 @@ ofproto_get_snoops(const struct ofproto *ofproto, struct sset *snoops)
connmgr_get_snoops(ofproto->connmgr, snoops);
}
static void
ofproto_rule_delete__(struct rule *rule, uint8_t reason)
OVS_REQUIRES(ofproto_mutex)
{
struct rule_collection rules;
rules.rules = rules.stub;
rules.n = 1;
rules.stub[0] = rule;
delete_flows__(&rules, reason, NULL);
}
/* Deletes 'rule' from 'ofproto'.
*
* Within an ofproto implementation, this function allows an ofproto
@@ -1361,7 +1349,7 @@ ofproto_rule_delete(struct ofproto *ofproto, struct rule *rule)
* switch is being deleted and any OpenFlow channels have been or soon will
* be killed. */
ovs_mutex_lock(&ofproto_mutex);
oftable_remove_rule__(ofproto, rule);
oftable_remove_rule(rule);
ofproto->ofproto_class->rule_delete(rule);
ovs_mutex_unlock(&ofproto_mutex);
}
@@ -1385,15 +1373,20 @@ ofproto_flush__(struct ofproto *ofproto)
ovs_mutex_lock(&ofproto_mutex);
OFPROTO_FOR_EACH_TABLE (table, ofproto) {
struct rule_collection rules;
struct rule *rule;
if (table->flags & OFTABLE_HIDDEN) {
continue;
}
rule_collection_init(&rules);
CLS_FOR_EACH (rule, cr, &table->cls) {
ofproto_rule_delete__(rule, OFPRR_DELETE);
rule_collection_add(&rules, rule);
}
delete_flows__(&rules, OFPRR_DELETE, NULL);
rule_collection_destroy(&rules);
}
/* XXX: Concurrent handler threads may insert new learned flows based on
* learn actions of the now deleted flows right after we release
@@ -4117,29 +4110,32 @@ handle_queue_stats_request(struct ofconn *ofconn,
return error;
}
static bool
should_evict_a_rule(struct oftable *table, unsigned int extra_space)
OVS_REQUIRES(ofproto_mutex)
OVS_NO_THREAD_SAFETY_ANALYSIS
{
return classifier_count(&table->cls) + extra_space > table->max_flows;
}
static enum ofperr
evict_rules_from_table(struct oftable *table, unsigned int extra_space)
OVS_REQUIRES(ofproto_mutex)
{
while (should_evict_a_rule(table, extra_space)) {
enum ofperr error = 0;
struct rule_collection rules;
unsigned int count = classifier_count(&table->cls) + extra_space;
unsigned int max_flows = table->max_flows;
rule_collection_init(&rules);
while (count-- > max_flows) {
struct rule *rule;
if (!choose_rule_to_evict(table, &rule)) {
return OFPERR_OFPFMFC_TABLE_FULL;
error = OFPERR_OFPFMFC_TABLE_FULL;
break;
} else {
ofproto_rule_delete__(rule, OFPRR_EVICTION);
eviction_group_remove_rule(rule);
rule_collection_add(&rules, rule);
}
}
delete_flows__(&rules, OFPRR_EVICTION, NULL);
rule_collection_destroy(&rules);
return 0;
return error;
}
/* Implements OFPFC_ADD and the cases for OFPFC_MODIFY and OFPFC_MODIFY_STRICT
@@ -4287,6 +4283,7 @@ add_flow(struct ofproto *ofproto, struct ofputil_flow_mod *fm,
meter_insert_rule(rule);
}
classifier_defer(&table->cls);
classifier_insert(&table->cls, &rule->cr);
error = ofproto->ofproto_class->rule_insert(rule);
@@ -4295,6 +4292,8 @@ add_flow(struct ofproto *ofproto, struct ofputil_flow_mod *fm,
ofproto_rule_unref(rule);
return error;
}
classifier_publish(&table->cls);
learned_cookies_inc(ofproto, actions);
if (minimask_get_vid_mask(&rule->cr.match.mask) == VLAN_VID_MASK) {
@@ -4513,21 +4512,34 @@ delete_flows__(const struct rule_collection *rules,
if (rules->n) {
struct list dead_cookies = LIST_INITIALIZER(&dead_cookies);
struct ofproto *ofproto = rules->rules[0]->ofproto;
struct rule *rule, *next;
size_t i;
for (i = 0; i < rules->n; i++) {
struct rule *rule = rules->rules[i];
const struct rule_actions *actions = rule_get_actions(rule);
for (i = 0, next = rules->rules[0];
rule = next, next = (++i < rules->n) ? rules->rules[i] : NULL,
rule; ) {
struct classifier *cls = &ofproto->tables[rule->table_id].cls;
uint8_t next_table = next ? next->table_id : UINT8_MAX;
ofproto_rule_send_removed(rule, reason);
ofmonitor_report(ofproto->connmgr, rule, NXFME_DELETED, reason,
req ? req->ofconn : NULL, req ? req->xid : 0,
NULL);
oftable_remove_rule(rule);
if (next_table == rule->table_id) {
classifier_defer(cls);
}
classifier_remove(cls, &rule->cr);
if (next_table != rule->table_id) {
classifier_publish(cls);
}
ofproto_rule_remove__(ofproto, rule);
ofproto->ofproto_class->rule_delete(rule);
learned_cookies_dec(ofproto, actions, &dead_cookies);
learned_cookies_dec(ofproto, rule_get_actions(rule),
&dead_cookies);
}
learned_cookies_flush(ofproto, &dead_cookies);
ofmonitor_flush(ofproto->connmgr);
@@ -4553,7 +4565,7 @@ delete_flows_loose(struct ofproto *ofproto,
error = collect_rules_loose(ofproto, &criteria, &rules);
rule_criteria_destroy(&criteria);
if (!error && rules.n > 0) {
if (!error) {
delete_flows__(&rules, fm->delete_reason, req);
}
rule_collection_destroy(&rules);
@@ -4579,7 +4591,7 @@ delete_flow_strict(struct ofproto *ofproto, const struct ofputil_flow_mod *fm,
error = collect_rules_strict(ofproto, &criteria, &rules);
rule_criteria_destroy(&criteria);
if (!error && rules.n > 0) {
if (!error) {
delete_flows__(&rules, fm->delete_reason, req);
}
rule_collection_destroy(&rules);
@@ -4626,7 +4638,12 @@ void
ofproto_rule_expire(struct rule *rule, uint8_t reason)
OVS_REQUIRES(ofproto_mutex)
{
ofproto_rule_delete__(rule, reason);
struct rule_collection rules;
rules.rules = rules.stub;
rules.n = 1;
rules.stub[0] = rule;
delete_flows__(&rules, reason, NULL);
}
/* Reduces '*timeout' to no more than 'max'. A value of zero in either case
@@ -5308,9 +5325,7 @@ handle_delete_meter(struct ofconn *ofconn, struct ofputil_meter_mod *mm)
}
}
}
if (rules.n > 0) {
delete_flows__(&rules, OFPRR_METER_DELETE, NULL);
}
delete_flows__(&rules, OFPRR_METER_DELETE, NULL);
/* Delete the meters. */
meter_delete(ofproto, first, last);
@@ -6770,15 +6785,12 @@ oftable_enable_eviction(struct oftable *table,
}
}
/* Removes 'rule' from the oftable that contains it. */
/* Removes 'rule' from the ofproto data structures AFTER caller has removed
* it from the classifier. */
static void
oftable_remove_rule__(struct ofproto *ofproto, struct rule *rule)
ofproto_rule_remove__(struct ofproto *ofproto, struct rule *rule)
OVS_REQUIRES(ofproto_mutex)
{
struct classifier *cls = &ofproto->tables[rule->table_id].cls;
classifier_remove(cls, &rule->cr);
cookies_remove(ofproto, rule);
eviction_group_remove_rule(rule);
@@ -6795,7 +6807,11 @@ static void
oftable_remove_rule(struct rule *rule)
OVS_REQUIRES(ofproto_mutex)
{
oftable_remove_rule__(rule->ofproto, rule);
struct classifier *cls = &rule->ofproto->tables[rule->table_id].cls;
if (classifier_remove(cls, &rule->cr)) {
ofproto_rule_remove__(rule->ofproto, rule);
}
}
/* unixctl commands. */

View File

@@ -453,6 +453,7 @@ destroy_classifier(struct classifier *cls)
{
struct test_rule *rule;
classifier_defer(cls);
CLS_FOR_EACH (rule, cls_rule, cls) {
if (classifier_remove(cls, &rule->cls_rule)) {
ovsrcu_postpone(free_rule, rule);
@@ -785,6 +786,7 @@ test_rule_replacement(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
ovsrcu_postpone(free_rule, rule1);
compare_classifiers(&cls, &tcls);
check_tables(&cls, 1, 1, 0);
classifier_defer(&cls);
classifier_remove(&cls, &rule2->cls_rule);
tcls_destroy(&tcls);
@@ -921,6 +923,7 @@ test_many_rules_in_one_list (int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
check_tables(&cls, n > 0, n, n - 1);
}
classifier_defer(&cls);
for (i = 0; i < N_RULES; i++) {
if (classifier_remove(&cls, &rules[i]->cls_rule)) {
ovsrcu_postpone(free_rule, rules[i]);

View File

@@ -2368,6 +2368,7 @@ fte_free_all(struct classifier *cls)
{
struct fte *fte;
classifier_defer(cls);
CLS_FOR_EACH (fte, rule, cls) {
classifier_remove(cls, &fte->rule);
ovsrcu_postpone(fte_free, fte);
@@ -2418,6 +2419,7 @@ read_flows_from_file(const char *filename, struct classifier *cls, int index)
ds_init(&s);
usable_protocols = OFPUTIL_P_ANY;
line_number = 0;
classifier_defer(cls);
while (!ds_get_preprocessed_line(&s, file, &line_number)) {
struct fte_version *version;
struct ofputil_flow_mod fm;
@@ -2442,6 +2444,7 @@ read_flows_from_file(const char *filename, struct classifier *cls, int index)
fte_insert(cls, &fm.match, fm.priority, version, index);
}
classifier_publish(cls);
ds_destroy(&s);
if (file != stdin) {
@@ -2530,6 +2533,7 @@ read_flows_from_switch(struct vconn *vconn,
reply = NULL;
ofpbuf_init(&ofpacts, 0);
classifier_defer(cls);
while (recv_flow_stats_reply(vconn, send_xid, &reply, &fs, &ofpacts)) {
struct fte_version *version;
@@ -2544,6 +2548,7 @@ read_flows_from_switch(struct vconn *vconn,
fte_insert(cls, &fs.match, fs.priority, version, index);
}
classifier_publish(cls);
ofpbuf_uninit(&ofpacts);
}