@@ -15,24 +15,27 @@ import (
1515 "github.com/go-sql-driver/mysql"
1616 "github.com/gosuri/uiprogress"
1717 "github.com/kr/pretty"
18- "github.com/pkg/errors"
1918
2019 log "github.com/sirupsen/logrus"
2120 kingpin "gopkg.in/alecthomas/kingpin.v2"
2221)
2322
2423var (
25- app = kingpin .New ("mysql_random_data_loader" , "MySQL Random Data Loader" )
24+ app = kingpin .New ("mysql_random_data_loader" , "MySQL Random Data Loader" )
25+
26+ bulkSize = app .Flag ("bulk-size" , "Number of rows per insert statement" ).Default ("1000" ).Int ()
27+ debug = app .Flag ("debug" , "Log debugging information" ).Bool ()
28+ factor = app .Flag ("fk-samples-factor" , "Percentage used to get random samples for foreign keys fields" ).Default ("0.3" ).Float64 ()
2629 host = app .Flag ("host" , "Host name/IP" ).Short ('h' ).Default ("127.0.0.1" ).String ()
30+ maxRetries = app .Flag ("max-retries" , "Number of rows to insert" ).Default ("100" ).Int ()
31+ maxThreads = app .Flag ("max-threads" , "Maximum number of threads to run inserts" ).Default ("1" ).Int ()
32+ noProgress = app .Flag ("no-progress" , "Show progress bar" ).Default ("false" ).Bool ()
33+ pass = app .Flag ("password" , "Password" ).Short ('p' ).String ()
2734 port = app .Flag ("port" , "Port" ).Short ('P' ).Default ("3306" ).Int ()
35+ print = app .Flag ("print" , "Print queries to the standard output instead of inserting them into the db" ).Bool ()
36+ samples = app .Flag ("max-fk-samples" , "Maximum number of samples for foreign keys fields" ).Default ("100" ).Int64 ()
2837 user = app .Flag ("user" , "User" ).Short ('u' ).String ()
29- pass = app .Flag ("password" , "Password" ).Short ('p' ).String ()
30- maxThreads = app .Flag ("max-threads" , "Maximum number of threads to run inserts" ).Int ()
31- debug = app .Flag ("debug" , "Log debugging information" ).Bool ()
32- bulkSize = app .Flag ("bulk-size" , "Number of rows per insert statement" ).Default ("1000" ).Int ()
33- noProgress = app .Flag ("no-progressbar" , "Show progress bar" ).Default ("false" ).Bool ()
34- qps = app .Flag ("qps" , "Queries per second. 0 = unlimited" ).Default ("0" ).Int ()
35- maxRetries = app .Flag ("max-retries" , "Number of rows to insert" ).Default ("100" ).Int ()
38+ version = app .Flag ("version" , "Show version and exit" ).Bool ()
3639
3740 schema = app .Arg ("database" , "Database" ).Required ().String ()
3841 tableName = app .Arg ("table" , "Table" ).Required ().String ()
@@ -50,12 +53,32 @@ var (
5053 "double" : 0x7FFFFFFF ,
5154 "bigint" : 0x7FFFFFFFFFFFFFFF ,
5255 }
56+
57+ Version = "0.0.0."
58+ Commit = "<sha1>"
59+ Branch = "branch-name"
60+ Build = "2017-01-01"
61+ GoVersion = "1.9.2"
5362)
5463
5564type insertValues []getters.Getter
65+ type insertFunction func (* sql.DB , string , chan int , chan bool , * sync.WaitGroup )
5666
5767func main () {
58- kingpin .MustParse (app .Parse (os .Args [1 :]))
68+ _ , err := app .Parse (os .Args [1 :])
69+
70+ if * version {
71+ fmt .Printf ("Version : %s\n " , Version )
72+ fmt .Printf ("Commit : %s\n " , Commit )
73+ fmt .Printf ("Branch : %s\n " , Branch )
74+ fmt .Printf ("Build : %s\n " , Build )
75+ fmt .Printf ("Go version: %s\n " , GoVersion )
76+ return
77+ }
78+ if err != nil {
79+ log .Errorln (err )
80+ os .Exit (1 )
81+ }
5982
6083 address := * host
6184 net := "unix"
@@ -98,6 +121,7 @@ func main() {
98121 log .SetFormatter (& log.TextFormatter {FullTimestamp : true })
99122 if * debug {
100123 log .SetLevel (log .DebugLevel )
124+ * noProgress = true
101125 }
102126 log .Debug (pretty .Sprint (table ))
103127
@@ -121,11 +145,8 @@ func main() {
121145 * maxThreads = 1
122146 }
123147
124- log .Info ("Starting" )
125-
126- bar := uiprogress .AddBar (* rows ).AppendCompleted ().PrependElapsed ()
127- if ! * noProgress {
128- uiprogress .Start ()
148+ if ! * print {
149+ log .Info ("Starting" )
129150 }
130151
131152 // Example: want 11 rows with bulksize 4:
@@ -135,16 +156,43 @@ func main() {
135156 // remainder = rows - count = 11 - 8 = 3
136157 // And then, we need to run this insert once to complete 11 rows
137158 // INSERT INTO table (f1, f2) VALUES (?, ?), (?, ?), (?, ?)
159+ newLineOnEachRow := false
138160 count := * rows / * bulkSize
139161 remainder := * rows - count * * bulkSize
140162 semaphores := makeSemaphores (* maxThreads )
141163 rowValues := makeValueFuncs (db , table .Fields )
142164 log .Debugf ("Must run %d bulk inserts having %d rows each" , count , * bulkSize )
143- okCount , err := run (db , table , bar , semaphores , rowValues , count , * bulkSize , * qps )
165+
166+ var runInsertFunc insertFunction
167+ runInsertFunc = runInsert
168+ if * print {
169+ * maxThreads = 1
170+ * noProgress = true
171+ newLineOnEachRow = true
172+ runInsertFunc = func (db * sql.DB , insertQuery string , resultsChan chan int , sem chan bool , wg * sync.WaitGroup ) {
173+ fmt .Println (insertQuery )
174+ resultsChan <- * bulkSize
175+ sem <- true
176+ wg .Done ()
177+ }
178+ }
179+
180+ bar := uiprogress .AddBar (* rows ).AppendCompleted ().PrependElapsed ()
181+ if ! * noProgress {
182+ uiprogress .Start ()
183+ }
184+
185+ okCount , err := run (db , table , bar , semaphores , rowValues , count , * bulkSize , runInsertFunc , newLineOnEachRow )
186+ if err != nil {
187+ log .Errorln (err )
188+ }
144189 var okrCount , okiCount int // remainder & individual inserts OK count
145190 if remainder > 0 {
146191 log .Debugf ("Must run 1 extra bulk insert having %d rows, to complete %d rows" , remainder , * rows )
147- okrCount , err = run (db , table , bar , semaphores , rowValues , 1 , remainder , * qps )
192+ okrCount , err = run (db , table , bar , semaphores , rowValues , 1 , remainder , runInsertFunc , newLineOnEachRow )
193+ if err != nil {
194+ log .Errorln (err )
195+ }
148196 }
149197
150198 // If there were errors and at this point we have less rows than *rows,
@@ -155,47 +203,69 @@ func main() {
155203 log .Debugf ("Running extra %d individual inserts (duplicated keys?)" , * rows - totalOkCount )
156204 }
157205 for totalOkCount < * rows && retries < * maxRetries {
158- okiCount , _ = run (db , table , bar , semaphores , rowValues , * rows - totalOkCount , 1 , * qps )
206+ okiCount , err = run (db , table , bar , semaphores , rowValues , * rows - totalOkCount , 1 , runInsertFunc , newLineOnEachRow )
207+ if err != nil {
208+ log .Errorf ("Cannot run extra insert: %s" , err )
209+ }
210+
159211 retries ++
160212 totalOkCount += okiCount
161213 }
162214
163215 time .Sleep (500 * time .Millisecond ) // Let the progress bar to update
164- log .Printf ("%d rows inserted" , totalOkCount )
216+ if ! * print {
217+ log .Printf ("%d rows inserted" , totalOkCount )
218+ }
165219 db .Close ()
166220}
167221
168- func run (db * sql.DB , table * tableparser.Table , bar * uiprogress.Bar , sem chan bool , rowValues insertValues , count , size int , qps int ) (int , error ) {
222+ func run (db * sql.DB , table * tableparser.Table , bar * uiprogress.Bar , sem chan bool ,
223+ rowValues insertValues , count , bulkSize int , insertFunc insertFunction , newLineOnEachRow bool ) (int , error ) {
169224 if count == 0 {
170225 return 0 , nil
171226 }
172227 var wg sync.WaitGroup
228+ insertQuery := generateInsertStmt (table )
229+ rowsChan := make (chan []string , 1000 )
230+ okRowsChan := countRowsOK (count , bar )
173231
174- bulkStmt , err := db .Prepare (generateInsertStmt (table , size ))
175- if err != nil {
176- return 0 , errors .Wrap (err , "Cannot prepare bulk insert" )
232+ go generateInsertData (count * bulkSize , rowValues , rowsChan )
233+ defaultSeparator1 := ""
234+ if newLineOnEachRow {
235+ defaultSeparator1 = "\n "
177236 }
178237
179- rowsChan := make (chan []interface {}, 1000 )
180-
181- okRowsChan := countRowsOK (count , bar )
182-
183- go generateInsertData (count , size , rowValues , rowsChan )
238+ i := 0
239+ rowsCount := 0
240+ sep1 , sep2 := defaultSeparator1 , ""
184241
185- var ticker <- chan time.Time
186- if qps > 0 {
187- delay := time .Second / time .Duration (qps )
188- ticker = time .NewTicker (delay ).C
189- }
190- for i := 0 ; i < count ; i ++ {
242+ for i < count {
191243 rowData := <- rowsChan
192- <- sem
193- if ticker != nil {
194- <- ticker
195- log .Debugf ("QPS in effect. Inserting ..." )
244+ rowsCount ++
245+ insertQuery += sep1 + " ("
246+ for _ , field := range rowData {
247+ insertQuery += sep2 + string (field )
248+ sep2 = ", "
196249 }
250+ insertQuery += ")"
251+ sep1 = ", "
252+ if newLineOnEachRow {
253+ sep1 += "\n "
254+ }
255+ sep2 = ""
256+ if rowsCount < bulkSize {
257+ continue
258+ }
259+
260+ insertQuery += ";\n "
261+ <- sem
197262 wg .Add (1 )
198- go runInsert (bulkStmt , rowData , okRowsChan , sem , & wg )
263+ go insertFunc (db , insertQuery , okRowsChan , sem , & wg )
264+
265+ insertQuery = generateInsertStmt (table )
266+ sep1 , sep2 = defaultSeparator1 , ""
267+ rowsCount = 0
268+ i ++
199269 }
200270
201271 wg .Wait ()
@@ -242,57 +312,40 @@ func countRowsOK(count int, bar *uiprogress.Bar) chan int {
242312// rowsChan <- [ v3-1, v3-2, v3-3, v4-1, v4-2, v4-3 ]
243313// rowsChan <- [ v1-5, v5-2, v5-3, v6-1, v6-2, v6-3 ]
244314//
245- func generateInsertData (count , size int , values insertValues , rowsChan chan []interface {}) {
246- //runtime.LockOSThread()
315+ func generateInsertData (count int , values insertValues , rowsChan chan []string ) {
247316 for i := 0 ; i < count ; i ++ {
248- insertRow := make ([]interface {}, 0 , len (values ))
249- for j := 0 ; j < size ; j ++ {
250- for _ , val := range values {
251- insertRow = append (insertRow , val .Value ())
252- }
317+ insertRow := make ([]string , 0 , len (values ))
318+ for _ , val := range values {
319+ insertRow = append (insertRow , val .String ())
253320 }
254321 rowsChan <- insertRow
255322 }
256323}
257324
258- func generateInsertStmt (table * tableparser.Table , size int ) string {
325+ func generateInsertStmt (table * tableparser.Table ) string {
259326 fields := getFieldNames (table .Fields )
260327 query := fmt .Sprintf ("INSERT IGNORE INTO %s.%s (%s) VALUES " ,
261328 backticks (table .Schema ),
262329 backticks (table .Name ),
263330 strings .Join (fields , "," ),
264331 )
265-
266- // Build the placeholders group for each row, including parenthesis: (?, ?, ...)
267- placeholders := "("
268- sep := ""
269- for i := 0 ; i < len (fields ); i ++ {
270- placeholders += sep + "?"
271- sep = ", "
272- }
273- placeholders += ")"
274-
275- // Join 'bulkSize' placeholders groups to the query
276- // INSERT INTO db.table (f1, f2, ...) VALUES (?, ?, ...), (?, ?, ...), ....
277- sep = ""
278- for i := 0 ; i < size ; i ++ {
279- query += sep + placeholders
280- sep = ", "
281- }
282-
283332 return query
284333}
285334
286- func runInsert (stmt * sql.Stmt , data [] interface {} , resultsChan chan int , sem chan bool , wg * sync.WaitGroup ) {
287- result , err := stmt .Exec (data ... )
335+ func runInsert (db * sql.DB , insertQuery string , resultsChan chan int , sem chan bool , wg * sync.WaitGroup ) {
336+ result , err := db .Exec (insertQuery )
288337 if err != nil {
289338 log .Debugf ("Cannot run insert: %s" , err )
290339 resultsChan <- 0
340+ sem <- true
291341 wg .Done ()
292342 return
293343 }
294344
295- rowsAffected , _ := result .RowsAffected ()
345+ rowsAffected , err := result .RowsAffected ()
346+ if err != nil {
347+ log .Errorf ("Cannot get rows affected after insert: %s" , err )
348+ }
296349 resultsChan <- int (rowsAffected )
297350 sem <- true
298351 wg .Done ()
0 commit comments