Optimize upsert.

This commit is contained in:
Aaron L 2016-09-14 23:03:05 -07:00
parent 78de983d7d
commit 09eeef63af
3 changed files with 102 additions and 47 deletions

View file

@ -23,7 +23,7 @@ type (
} }
) )
// Cache for insert and update // Cache for insert, update and upsert
var ( var (
{{$varNameSingular}}Type = reflect.TypeOf(&{{$tableNameSingular}}{}) {{$varNameSingular}}Type = reflect.TypeOf(&{{$tableNameSingular}}{})
{{$varNameSingular}}Mapping = queries.MakeStructMapping({{$varNameSingular}}Type) {{$varNameSingular}}Mapping = queries.MakeStructMapping({{$varNameSingular}}Type)
@ -31,6 +31,8 @@ var (
{{$varNameSingular}}InsertCache = make(map[string]insertCache) {{$varNameSingular}}InsertCache = make(map[string]insertCache)
{{$varNameSingular}}UpdateCacheMut sync.RWMutex {{$varNameSingular}}UpdateCacheMut sync.RWMutex
{{$varNameSingular}}UpdateCache = make(map[string]updateCache) {{$varNameSingular}}UpdateCache = make(map[string]updateCache)
{{$varNameSingular}}UpsertCacheMut sync.RWMutex
{{$varNameSingular}}UpsertCache = make(map[string]insertCache)
) )
// Force time package dependency for automated UpdatedAt/CreatedAt. // Force time package dependency for automated UpdatedAt/CreatedAt.

View file

@ -20,7 +20,7 @@ func (o *{{$tableNameSingular}}) UpsertP(exec boil.Executor, {{if ne .DriverName
panic(boil.WrapErr(err)) panic(boil.WrapErr(err))
} }
} }
// Upsert attempts an insert using an executor, and does an update or ignore on conflict. // Upsert attempts an insert using an executor, and does an update or ignore on conflict.
func (o *{{$tableNameSingular}}) Upsert(exec boil.Executor, {{if ne .DriverName "mysql"}}updateOnConflict bool, conflictColumns []string, {{end}}updateColumns []string, whitelist ...string) error { func (o *{{$tableNameSingular}}) Upsert(exec boil.Executor, {{if ne .DriverName "mysql"}}updateOnConflict bool, conflictColumns []string, {{end}}updateColumns []string, whitelist ...string) error {
if o == nil { if o == nil {
@ -35,44 +35,97 @@ func (o *{{$tableNameSingular}}) Upsert(exec boil.Executor, {{if ne .DriverName
} }
{{- end}} {{- end}}
var err error // Build cache key in-line uglily - mysql vs postgres problems
var ret []string buf := strmangle.GetBuffer()
whitelist, ret = strmangle.InsertColumnSet(
{{$varNameSingular}}Columns,
{{$varNameSingular}}ColumnsWithDefault,
{{$varNameSingular}}ColumnsWithoutDefault,
queries.NonZeroDefaultSet({{$varNameSingular}}ColumnsWithDefault, o),
whitelist,
)
update := strmangle.UpdateColumnSet(
{{$varNameSingular}}Columns,
{{$varNameSingular}}PrimaryKeyColumns,
updateColumns,
)
{{if ne .DriverName "mysql" -}} {{if ne .DriverName "mysql" -}}
conflict := conflictColumns if updateOnConflict {
if len(conflict) == 0 { buf.WriteByte('t')
conflict = make([]string, len({{$varNameSingular}}PrimaryKeyColumns)) } else {
copy(conflict, {{$varNameSingular}}PrimaryKeyColumns) buf.WriteByte('f')
}
buf.WriteByte('.')
for _, c := range conflictColumns {
buf.WriteString(c)
}
buf.WriteByte('.')
{{end -}}
for _, c := range updateColumns {
buf.WriteString(c)
}
buf.WriteByte('.')
for _, c := range whitelist {
buf.WriteString(c)
}
key := buf.String()
strmangle.PutBuffer(buf)
{{$varNameSingular}}UpsertCacheMut.RLock()
cache, cached := {{$varNameSingular}}UpsertCache[key]
{{$varNameSingular}}UpsertCacheMut.RUnlock()
var err error
if !cached {
var ret []string
whitelist, ret = strmangle.InsertColumnSet(
{{$varNameSingular}}Columns,
{{$varNameSingular}}ColumnsWithDefault,
{{$varNameSingular}}ColumnsWithoutDefault,
queries.NonZeroDefaultSet({{$varNameSingular}}ColumnsWithDefault, o),
whitelist,
)
update := strmangle.UpdateColumnSet(
{{$varNameSingular}}Columns,
{{$varNameSingular}}PrimaryKeyColumns,
updateColumns,
)
{{if ne .DriverName "mysql" -}}
var conflict []string
if len(conflictColumns) == 0 {
conflict = make([]string, len({{$varNameSingular}}PrimaryKeyColumns))
copy(conflict, {{$varNameSingular}}PrimaryKeyColumns)
}
cache.query = queries.BuildUpsertQueryPostgres(dialect, "{{$schemaTable}}", updateOnConflict, ret, update, conflict, whitelist)
{{- else -}}
cache.query = queries.BuildUpsertQueryMySQL(dialect, "{{.Table.Name}}", update, whitelist)
cache.retQuery = fmt.Sprintf(
"SELECT %s FROM {{.LQ}}{{.Table.Name}}{{.RQ}} WHERE {{whereClause .LQ .RQ 0 .Table.PKey.Columns}}",
strings.Join(strmangle.IdentQuoteSlice(dialect.LQ, dialect.RQ, ret), ","),
)
{{- end}}
cache.valueMapping, err = queries.BindMapping({{$varNameSingular}}Type, {{$varNameSingular}}Mapping, whitelist)
if err != nil {
return err
}
if len(ret) != 0 {
cache.retMapping, err = queries.BindMapping({{$varNameSingular}}Type, {{$varNameSingular}}Mapping, ret)
if err != nil {
return err
}
}
}
value := reflect.Indirect(reflect.ValueOf(o))
values := queries.ValuesFromMapping(value, cache.valueMapping)
var returns []interface{}
if len(cache.retMapping) != 0 {
returns = queries.PtrsFromMapping(value, cache.retMapping)
} }
query := queries.BuildUpsertQueryPostgres(dialect, "{{$schemaTable}}", updateOnConflict, ret, update, conflict, whitelist)
{{- else -}}
query := queries.BuildUpsertQueryMySQL(dialect, "{{.Table.Name}}", update, whitelist)
{{- end}}
if boil.DebugMode { if boil.DebugMode {
fmt.Fprintln(boil.DebugWriter, query) fmt.Fprintln(boil.DebugWriter, cache.query)
fmt.Fprintln(boil.DebugWriter, queries.GetStructValues(o, whitelist...)) fmt.Fprintln(boil.DebugWriter, values)
} }
{{- if .UseLastInsertID}} {{- if .UseLastInsertID}}
result, err := exec.Exec(query, queries.GetStructValues(o, whitelist...)...) result, err := exec.Exec(cache.query, values...)
if err != nil { if err != nil {
return errors.Wrap(err, "{{.PkgName}}: unable to upsert for {{.Table.Name}}") return errors.Wrap(err, "{{.PkgName}}: unable to upsert for {{.Table.Name}}")
} }
if len(ret) == 0 { if len(cache.retMapping) == 0 {
{{if not .NoHooks -}} {{if not .NoHooks -}}
return o.doAfterUpsertHooks(exec) return o.doAfterUpsertHooks(exec)
{{else -}} {{else -}}
@ -99,33 +152,34 @@ func (o *{{$tableNameSingular}}) Upsert(exec boil.Executor, {{if ne .DriverName
} }
} }
if lastID != 0 && len(ret) == 1 { if lastID != 0 && len(cache.retMapping) == 1 {
retQuery := fmt.Sprintf(
"SELECT %s FROM {{.LQ}}{{.Table.Name}}{{.RQ}} WHERE {{whereClause .LQ .RQ 0 .Table.PKey.Columns}}",
strings.Join(strmangle.IdentQuoteSlice(dialect.LQ, dialect.RQ, ret), ","),
)
if boil.DebugMode { if boil.DebugMode {
fmt.Fprintln(boil.DebugWriter, ret) fmt.Fprintln(boil.DebugWriter, cache.retQuery)
fmt.Fprintln(boil.DebugWriter, identifierCols...) fmt.Fprintln(boil.DebugWriter, identifierCols...)
} }
err = exec.QueryRow(retQuery, identifierCols...).Scan(queries.GetStructPointers(o, ret...)...) err = exec.QueryRow(cache.retQuery, identifierCols...).Scan(returns...)
if err != nil { if err != nil {
return errors.Wrap(err, "{{.PkgName}}: unable to populate default values for {{.Table.Name}}") return errors.Wrap(err, "{{.PkgName}}: unable to populate default values for {{.Table.Name}}")
} }
} }
{{- else}} {{- else}}
if len(ret) != 0 { if len(cache.retMapping) != 0 {
err = exec.QueryRow(query, queries.GetStructValues(o, whitelist...)...).Scan(queries.GetStructPointers(o, ret...)...) err = exec.QueryRow(cache.query, values...).Scan(returns...)
} else { } else {
_, err = exec.Exec(query, queries.GetStructValues(o, whitelist...)...) _, err = exec.Exec(cache.query, values...)
} }
if err != nil { if err != nil {
return errors.Wrap(err, "{{.PkgName}}: unable to upsert for {{.Table.Name}}") return errors.Wrap(err, "{{.PkgName}}: unable to upsert for {{.Table.Name}}")
} }
{{- end}} {{- end}}
if !cached {
{{$varNameSingular}}UpsertCacheMut.Lock()
{{$varNameSingular}}UpsertCache[key] = cache
{{$varNameSingular}}UpsertCacheMut.Unlock()
}
{{if not .NoHooks -}} {{if not .NoHooks -}}
return o.doAfterUpsertHooks(exec) return o.doAfterUpsertHooks(exec)
{{- else -}} {{- else -}}

View file

@ -6,15 +6,15 @@ type M map[string]interface{}
// fails or there was a primary key configuration that was not resolvable. // fails or there was a primary key configuration that was not resolvable.
var ErrSyncFail = errors.New("{{.PkgName}}: failed to synchronize data after insert") var ErrSyncFail = errors.New("{{.PkgName}}: failed to synchronize data after insert")
type insertCache struct{ type insertCache struct {
query string query string
retQuery string retQuery string
valueMapping []uint64 valueMapping []uint64
retMapping []uint64 retMapping []uint64
} }
type updateCache struct{ type updateCache struct {
query string query string
valueMapping []uint64 valueMapping []uint64
} }
@ -35,4 +35,3 @@ func makeCacheKey(wl, nzDefaults []string) string {
strmangle.PutBuffer(buf) strmangle.PutBuffer(buf)
return str return str
} }