Example of using BSPump with CSV files

In the previous article, I talked about the stream-analyzer library called BSPump that originated during one of our workshops. Its usage varies from simple data processors to real-time data analyzers, metric computation, and anomaly detection. However, it all may sound abstract to you unless you discover the basic rules and elegance of its implementation. So this is exactly what I am going to show you in the following article.

Let us say we have a CSV file, no, wait, a lot of CSV files that are coming to our storage directory with flashing speed. We even do not have the slightest notion what names of the files are, the only thing we know is that we need to process their data and deliver them to our database as quickly as possible. After a minute (and it is a lot of time for us software engineers) of thinking we come up with a solution: We need to write a simple program whose instances run on many computers, processors or processor cores at once, so they are together able to quickly process the CSV files in our storage directory (or any other media or endpoint to be general). We need to make sure that each stored file is loaded only by one of these instances, that the data are processed and sent to the database asynchronously line by line and that every instance of the program can handle lots of files until there are none. It seems like tough work, doesn’t it? Not with BSPump.

Let's do it

First, we need to take a look at the BSPump GitHub repository, which you can find here: There is an example file called that contains the basic work we need to develop our solution. You may notice that there are just a few lines of code in the example – that is the core of magic of BSPump!
The example first creates an application object and stores it in a local variable:

app = bspump.BSPumpApplication()

Since we need to obtain, process and send data, we also need to create the so-called pipeline that itself holds ordered objects of processors that get, modify and pass data to one another in a specified order. You can see the code of the sample pipeline in the top part of the example. The pipeline is represented by the class named SamplePipeline and the core of its function dwells in the method.
Let us take a closer look:

self.Sink = bspump.file.FileCSVSink(app, self, config={'path': 'out.csv'})
    bspump.file.FileCSVSource(app, self, config={
        'path': 'sample.csv',
        'delimiter': ';'
    bspump.common.PPrintProcessor(app, self),

The first processor that we are going to pass to the pipeline is FileCSVSource that is able to read a CSV file or files as Python dictionaries. In the example there is a specified file named sample.csv, however we can also specify a storage directory with more files. The simplest way how to do it is to modify the value of the path attribute from sample.csv to, let us say, ./data/*. This little change indicates that we are going to read all files from the data directory in our repository. Now you may ask: If there are more instances of the program running, will they all read all files present in the directory? The answer may surprise you, because the implicit behavior of the BSPump makes sure that every file is processed only once regardless of how many parallel instances are actually running. Each file is locked when one of the instances is reading it and when the file is processed, the appropriate state is indicated by the “-processed” suffix that is added to the filename, so no other instance will try to read it again. The BSPump will simply take care of it. The only thing we need to do is to remove the RunOnceTrigger that calls the pipeline exactly once, and use, for example, the OpportunisticTrigger, which keeps restarting the pipeline until there are no files available.

The sample pipeline then contains a processor that does nothing but prints the output data to the screen, and a CSV sink that saves the data to a specified output CSV file. We can of course use our own processors and sinks, for example the pre-prepared ElasticSearch sink:

bspump.elasticsearch.ElasticSearchSink(app, self, es_connection, config={
    "index_prefix": "YOUR_INDEX_PREFIX",
    "doctype": "YOUR_DOCTYPE"

The ElasicSearch sink receives a connection (as a parameter) that communicates between our program and the ElasticSearch that runs within a specified endpoint (URL). If you are more interested in this feature and its configuration, please see the ElasticSearch example file called in the BSPump repository. If we combine both examples together, we nearly obtain our desired solution.

Build your own processor

Now we can use our own processor instead of the previously mentioned PPrintProcessor.
Its code is quite simple:

class PrintProcessor(Processor):
    def process(self, context, event):
        return event

You can see that what it really does is that it only prints the input data that are represented by the event attribute. Creating your own processor is straightforward. In our example, the event attribute contains one line of the processed CSV file (whose name can be obtained through the context variable). This line’s data are implicitly represented by an ordered Python dictionary. We can now create our own class and include it in our sample pipeline. And that’s it. There is nothing more you have to do to create a functioning program that asynchronously reads CSV files, performs your specified transformations and delivers the data to a single database or other type of storage. You can of course move each class to a separate file, create Python modules, comment methods and make the code clean and crystal clear to other developers in your company.

Visualization of processed sample data loaded from CSV files with BSPump.

Image: Visualization of processed sample data loaded from CSV files with BSPump.


I hope this simple example illustrates how easy and simple it is to create seemingly complex application with BSPump, pipelines and asynchronous approach. If you have any ideas on how we can improve the concept or extend capabilities of the BSPump or the ASAB server the BSPump is based upon, please let us know or contribute to the code at GitHub:

Continue to next article

About the Author

Premysl Cerny

Software Developer at TeskaLabs

You Might Be Interested in Reading These Articles

Entangled ways of product development in the area of cybersecurity #1 - Asynchronous or parallel?

I started working at TeskaLabs at the beginning of autumn 2017 as a student at the Faculty of Information Technology of CTU. In the job advertisement, I was particularly interested in the fact that it is a small, product-based company that does not focus on just one technology or one programming language.

Continue reading ...

development tech premek

Published on November 15, 2022

Key Areas and Best Practices to Focus for Mobile API Security

With APIs (Application Programming Interfaces) becoming a crucial factor in any web or mobile application, security feels more like a journey than a destination. Of all the constituents that encompass an application, API gateway offers easy access points for a hacker to break in and steal your data. A single error in API can cause immense problems for any organization using your API.

Continue reading ...

security mobile development

Published on November 22, 2016

Inotify in ASAB Library

From blocking read challenge, ctypes and bitmasks to a solution that enables the ASAB framework to react to changes in the file system in real time.

Continue reading ...

asab development tech eliska

Published on August 15, 2023