Context

A Context describes the setup. Instantiating a Context with the default arguments using Context() is the most lightweight setup. All data is just in the local thread and is never serialized or deserialized.

If you want to process the data in parallel, you can use the multiprocessing module. Given the limitations of the default pickle serializer, you can specify to serialize all methods with cloudpickle instead. For example, a common instantiation with multiprocessing looks like this:

sc = pysparkling.Context(
    multiprocessing.Pool(4),
    serializer=cloudpickle.dumps,
    deserializer=pickle.loads,
)

This assumes that your data is serializable with pickle which is generally faster. You can also specify a custom serializer/deserializer for data.

class pysparkling.Context(pool=None, serializer=None, deserializer=None, data_serializer=None, data_deserializer=None, max_retries=3, retry_wait=0.0, cache_manager=None, catch_exceptions=False)[source]

Context object similar to a Spark Context.

The variable _stats contains measured timing information about data and function (de)serialization and workload execution to benchmark your jobs.

Parameters:
  • pool – An instance with a map(func, iterable) method.
  • serializer – Serializer for functions. Examples are pickle.dumps and cloudpickle.dumps.
  • deserializer – Deserializer for functions. For example pickle.loads.
  • data_serializer – Serializer for the data.
  • data_deserializer – Deserializer for the data.
  • max_retries (int) – maximum number a partition is retried
  • retry_wait (float) – seconds to wait between retries
  • cache_manager – custom cache manager (like TimedCacheManager)
  • catch_exceptions – whether to catch and silence user space exceptions
accumulator(value, accum_param=None)[source]

Create an Accumulator with the given initial value, using a given AccumulatorParam helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used.

parallelize(x, numSlices=None)[source]

Parallelize x.

Parameters:
  • x – An iterable (e.g. a list) that represents the data.
  • numSlices (int) – The number of partitions the data should be split into. A partition is a unit of data that is processed at a time.
Return type:

RDD

pickleFile(name, minPartitions=None)[source]

Read a pickle file.

Reads files created with RDD.saveAsPickleFile() into an RDD.

Parameters:
  • name – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type:

RDD

Example with a serialized list:

>>> import pickle
>>> from pysparkling import Context
>>> from tempfile import NamedTemporaryFile
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> with open(tmpFile.name, 'wb') as f:
...     pickle.dump(['hello', 'world'], f)
>>> Context().pickleFile(tmpFile.name).collect()[0] == 'hello'
True
runJob(rdd, func, partitions=None, allowLocal=False, resultHandler=None)[source]

This function is used by methods in the RDD.

Note that the maps are only inside generators and the resultHandler needs to take care of executing the ones that it needs. In other words, if you need everything to be executed, the resultHandler needs to be at least lambda x: list(x) to trigger execution of the generators.

Parameters:
  • func – Map function with signature func(TaskContext, Iterator over elements).
  • partitions – List of partitions that are involved. None means the map job is applied to all partitions.
  • allowLocal – Allows local execution.
  • resultHandler – Process the result from the maps.
Returns:

Result of resultHandler.

Return type:

list

binaryFiles(path, minPartitions=None)[source]

Read a binary file into an RDD.

Parameters:
  • path – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
Return type:

RDD

Warning

Not part of PySpark API.

Setting up examples:

>>> import os, pysparkling
>>> from backports import tempfile
>>> sc = pysparkling.Context()
>>> decode = lambda bstring: bstring.decode()

Example with whole file:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryFiles(tmp+'*').values().map(decode).collect()
['bellobello']
binaryRecords(path, recordLength=None)[source]

Read a binary file into an RDD.

Parameters:
  • path – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • recordLength – If None every file is a record, int means fixed length records and a string is used as a format string to struct to read the length of variable length binary records.
Return type:

RDD

Warning

Only an int recordLength is part of the PySpark API.

Setting up examples:

>>> import os, pysparkling
>>> from backports import tempfile
>>> sc = pysparkling.Context()
>>> decode = lambda bstring: bstring.decode()

Example with whole file:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryRecords(tmp+'*').map(decode).collect()
['bellobello']

Example with fixed length records:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(b'bellobello')
...     sc.binaryRecords(tmp+'*', recordLength=5).map(decode).collect()
['bello', 'bello']

Example with variable length records:

>>> with tempfile.TemporaryDirectory() as tmp:
...     with open(os.path.join(tmp, 'test.b'), 'wb') as f:
...         _ = f.write(struct.pack('<I', 5) + b'bello')
...         _ = f.write(struct.pack('<I', 10) + b'bellobello')
...     (sc.binaryRecords(tmp+'*', recordLength='<I')
...      .map(decode).collect())
['bello', 'bellobello']
textFile(filename, minPartitions=None, use_unicode=True)[source]

Read a text file into an RDD.

Parameters:
  • filename – Location of a file. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
  • use_unicode – (optional, default=True) Use utf8 if True and ascii if False.
Return type:

RDD

union(rdds)[source]

Create a union of rdds.

Parameters:rdds – Iterable of RDDs.
Return type:RDD
wholeTextFiles(path, minPartitions=None, use_unicode=True)[source]

Read text files into an RDD of pairs of file name and file content.

Parameters:
  • path – Location of the files. Can include schemes like http://, s3:// and file://, wildcard characters ? and * and multiple expressions separated by ,.
  • minPartitions – (optional) By default, every file is a partition, but this option allows to split these further.
  • use_unicode – (optional, default=True) Use utf8 if True and ascii if False.
Return type:

RDD