Reading and Writing¶
This is a collection of best practices or templates for reading and writing various input and output formats.
Batch¶
Python List¶
The most direct input and output is from and to a Python list.
import pysparkling
sc = pysparkling.Context()
# reading
rdd = sc.parallelize(['hello', 'world'])
# back to Python list
print(rdd.collect())
# back to an iterator
rdd.toLocalIterator()
ND-JSON¶
Newline delimited JSON is a text file where every line is its own JSON string.
import json
import pysparkling
sc = pysparkling.Context()
# reading
rdd = (
sc
.textFile('input.json')
.map(json.loads)
)
# writing
(
rdd
.map(json.dumps)
.saveAsTextFile('output.json')
)
CSV¶
import csv
import io
import pysparkling
sc = pysparkling.Context()
# reading
rdd = (
sc
.textFile('input.csv')
.mapPartitions(csv.reader)
)
# writing
def csv_row(data):
s = io.StringIO()
csv.writer(s).writerow(data)
return s.getvalue()[:-1]
(
rdd
.map(csv_row)
.saveAsTextFile('output.csv')
)
TensorFlow Records¶
This example preprocesses example data into a TensorFlow Records file. The
second part is a cross check and prints the contents of the tfrecords
file.
import pysparkling
import tensorflow as tf
def to_tfrecord(self, xy):
X, y = xy
example = tf.train.Example(features=tf.train.Features(feature={
'X': tf.train.Feature(float_list=tf.train.FloatList(value=X)),
'y': tf.train.Feature(int64_list=tf.train.Int64List(value=y)),
}))
return example.SerializeToString()
# example
X = [1.2, 3.1, 8.7]
y = [2, 5]
# writing
sc = pysparkling.Context()
rdd = (
sc
.parallelize([(X, y)])
.map(to_tfrecord)
)
with tf.python_io.TFRecordWriter('out.tfrecords') as writer:
for example in rdd.toLocalIterator():
writer.write(example)
# debugging a tf records file
for serialized_example in tf.python_io.tf_record_iterator('out.tfrecords'):
example = tf.train.Example()
example.ParseFromString(serialized_example)
X = example.features.feature['X'].float_list.value
y = example.features.feature['y'].int64_list.value
print(X, y)