Skip to content

Commit 8db6693

Browse files
gamolinadhalperi
authored andcommitted
Initial implementation of SpannerIO.Write
This closes apache#2166.
1 parent 24c6ff4 commit 8db6693

7 files changed

Lines changed: 474 additions & 13 deletions

File tree

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.examples.spanner;
19+
20+
import com.google.cloud.spanner.Database;
21+
import com.google.cloud.spanner.DatabaseAdminClient;
22+
import com.google.cloud.spanner.Mutation;
23+
import com.google.cloud.spanner.Operation;
24+
import com.google.cloud.spanner.Spanner;
25+
import com.google.cloud.spanner.SpannerException;
26+
import com.google.cloud.spanner.SpannerOptions;
27+
import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
28+
import java.util.Collections;
29+
import org.apache.beam.sdk.Pipeline;
30+
import org.apache.beam.sdk.io.TextIO;
31+
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
32+
import org.apache.beam.sdk.options.Default;
33+
import org.apache.beam.sdk.options.Description;
34+
import org.apache.beam.sdk.options.PipelineOptions;
35+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
36+
import org.apache.beam.sdk.options.Validation;
37+
import org.apache.beam.sdk.transforms.DoFn;
38+
import org.apache.beam.sdk.transforms.ParDo;
39+
import org.apache.beam.sdk.values.PCollection;
40+
41+
42+
43+
/**
44+
* Generalized bulk loader for importing CSV files into Spanner.
45+
*
46+
*/
47+
public class SpannerCSVLoader {
48+
49+
/**
50+
* Command options specification.
51+
*/
52+
private interface Options extends PipelineOptions {
53+
@Description("Create a sample database")
54+
@Default.Boolean(false)
55+
boolean isCreateDatabase();
56+
void setCreateDatabase(boolean createDatabase);
57+
58+
@Description("File to read from ")
59+
@Validation.Required
60+
String getInput();
61+
void setInput(String value);
62+
63+
@Description("Instance ID to write to in Spanner")
64+
@Validation.Required
65+
String getInstanceId();
66+
void setInstanceId(String value);
67+
68+
@Description("Database ID to write to in Spanner")
69+
@Validation.Required
70+
String getDatabaseId();
71+
void setDatabaseId(String value);
72+
73+
@Description("Table name")
74+
@Validation.Required
75+
String getTable();
76+
void setTable(String value);
77+
}
78+
79+
80+
/**
81+
* Constructs and executes the processing pipeline based upon command options.
82+
*/
83+
public static void main(String[] args) throws Exception {
84+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
85+
86+
Pipeline p = Pipeline.create(options);
87+
PCollection<String> lines = p.apply(TextIO.Read.from(options.getInput()));
88+
PCollection<Mutation> mutations = lines
89+
.apply(ParDo.of(new NaiveParseCsvFn(options.getTable())));
90+
mutations
91+
.apply(SpannerIO.writeTo(options.getInstanceId(), options.getDatabaseId()));
92+
p.run().waitUntilFinish();
93+
}
94+
95+
public static void createDatabase(Options options) {
96+
Spanner client = SpannerOptions.getDefaultInstance().getService();
97+
98+
DatabaseAdminClient databaseAdminClient = client.getDatabaseAdminClient();
99+
try {
100+
databaseAdminClient.dropDatabase(options.getInstanceId(), options
101+
.getDatabaseId());
102+
} catch (SpannerException e) {
103+
// Does not exist, ignore.
104+
}
105+
Operation<Database, CreateDatabaseMetadata> op = databaseAdminClient.createDatabase(
106+
options.getInstanceId(), options
107+
.getDatabaseId(), Collections.singleton("CREATE TABLE " + options.getTable() + " ("
108+
+ " Key INT64,"
109+
+ " Name STRING,"
110+
+ " Email STRING,"
111+
+ " Age INT,"
112+
+ ") PRIMARY KEY (Key)"));
113+
op.waitFor();
114+
}
115+
116+
117+
/**
118+
* A DoFn that creates a Spanner Mutation for each CSV line.
119+
*/
120+
static class NaiveParseCsvFn extends DoFn<String, Mutation> {
121+
private final String table;
122+
123+
NaiveParseCsvFn(String table) {
124+
this.table = table;
125+
}
126+
127+
@ProcessElement
128+
public void processElement(ProcessContext c) {
129+
String line = c.element();
130+
String[] elements = line.split(",");
131+
if (elements.length != 4) {
132+
return;
133+
}
134+
Mutation mutation = Mutation.newInsertOrUpdateBuilder(table)
135+
.set("Key").to(Long.valueOf(elements[0]))
136+
.set("Name").to(elements[1])
137+
.set("Email").to(elements[2])
138+
.set("Age").to(Integer.valueOf(elements[3]))
139+
.build();
140+
c.output(mutation);
141+
}
142+
}
143+
}

pom.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,15 @@
141141
<woodstox.version>4.4.1</woodstox.version>
142142
<spring.version>4.3.5.RELEASE</spring.version>
143143
<groovy-maven-plugin.version>2.0</groovy-maven-plugin.version>
144+
<api-common.version>1.0.0-rc2</api-common.version>
144145
<surefire-plugin.version>2.20</surefire-plugin.version>
145146
<failsafe-plugin.version>2.20</failsafe-plugin.version>
146147
<maven-resources-plugin.version>3.0.2</maven-resources-plugin.version>
147-
148+
148149
<compiler.error.flag>-Werror</compiler.error.flag>
149150
<compiler.default.pkginfo.flag>-Xpkginfo:always</compiler.default.pkginfo.flag>
150151
<compiler.default.exclude>nothing</compiler.default.exclude>
152+
<spanner.version>0.16.0-beta</spanner.version>
151153
</properties>
152154

153155
<packaging>pom</packaging>
@@ -854,6 +856,12 @@
854856
<version>${google-cloud-bigdataoss.version}</version>
855857
</dependency>
856858

859+
<dependency>
860+
<groupId>com.google.cloud</groupId>
861+
<artifactId>google-cloud-spanner</artifactId>
862+
<version>${spanner.version}</version>
863+
</dependency>
864+
857865
<dependency>
858866
<groupId>com.google.cloud.bigdataoss</groupId>
859867
<artifactId>util</artifactId>

sdks/java/core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@
203203
<artifactId>joda-time</artifactId>
204204
</dependency>
205205

206-
<!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly
207-
declare this dependency to include org.tukaani:xz on the classpath at runtime. -->
206+
<!-- To use org.apache.beam.io.AvroSource with XZ-encoded files, please explicitly
207+
declare this dependency to include org.tukaani:xz on the classpath at runtime. -->
208208
<dependency>
209209
<groupId>org.tukaani</groupId>
210210
<artifactId>xz</artifactId>

sdks/java/io/google-cloud-platform/pom.xml

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,28 @@
7777
<artifactId>jackson-databind</artifactId>
7878
</dependency>
7979

80+
<dependency>
81+
<groupId>io.grpc</groupId>
82+
<artifactId>grpc-core</artifactId>
83+
</dependency>
84+
85+
<dependency>
86+
<groupId>com.google.api.grpc</groupId>
87+
<artifactId>grpc-google-common-protos</artifactId>
88+
<version>${grpc-google-common-protos.version}</version>
89+
</dependency>
90+
8091
<dependency>
8192
<groupId>com.google.apis</groupId>
8293
<artifactId>google-api-services-bigquery</artifactId>
8394
</dependency>
8495

96+
<dependency>
97+
<groupId>com.google.api</groupId>
98+
<artifactId>api-common</artifactId>
99+
<version>${api-common.version}</version>
100+
</dependency>
101+
85102
<dependency>
86103
<groupId>com.google.apis</groupId>
87104
<artifactId>google-api-services-pubsub</artifactId>
@@ -112,11 +129,6 @@
112129
<artifactId>grpc-auth</artifactId>
113130
</dependency>
114131

115-
<dependency>
116-
<groupId>io.grpc</groupId>
117-
<artifactId>grpc-core</artifactId>
118-
</dependency>
119-
120132
<dependency>
121133
<groupId>io.grpc</groupId>
122134
<artifactId>grpc-netty</artifactId>
@@ -151,6 +163,12 @@
151163
<artifactId>joda-time</artifactId>
152164
</dependency>
153165

166+
<dependency>
167+
<groupId>com.google.cloud</groupId>
168+
<artifactId>google-cloud-spanner</artifactId>
169+
<version>${spanner.version}</version>
170+
</dependency>
171+
154172
<dependency>
155173
<groupId>com.google.cloud.bigtable</groupId>
156174
<artifactId>bigtable-protos</artifactId>
@@ -186,11 +204,6 @@
186204
<artifactId>google-auth-library-oauth2-http</artifactId>
187205
</dependency>
188206

189-
<dependency>
190-
<groupId>com.google.api.grpc</groupId>
191-
<artifactId>grpc-google-common-protos</artifactId>
192-
</dependency>
193-
194207
<dependency>
195208
<groupId>org.slf4j</groupId>
196209
<artifactId>slf4j-api</artifactId>

0 commit comments

Comments
 (0)