ASAB

Example of using BSPump with CSV files

In the previous article, I talked about the stream-analyzer library called BSPump that originated during one of our workshops. Its usage varies from simple data processors to real-time data analyzers, metric computation, and anomaly detection. However, it all may sound abstract to you unless you discover the basic rules and elegance of its implementation. So this is exactly what I am going to show you in the following article.

Let us say we have a CSV file, no, wait, a lot of CSV files that are coming to our storage directory with flashing speed. We even do not have the slightest notion what names of the files are, the only thing we know is that we need to process their data and deliver them to our database as quickly as possible. After a minute (and it is a lot of time for us software engineers) of thinking we come up with a solution: We need to write a simple program whose instances run on many computers, processors or processor cores at once, so they are together able to quickly process the CSV files in our storage directory (or any other media or endpoint to be general). We need to make sure that each stored file is loaded only by one of these instances, that the data are processed and sent to the database asynchronously line by line and that every instance of the program can handle lots of files until there are none. It seems like tough work, doesn’t it? Not with BSPump.

Let's do it

First, we need to take a look at the BSPump GitHub repository, which you can find here: https://github.com/LibertyAces/BitSwanPump There is an example file called bspump-csv.py that contains the basic work we need to develop our solution. You may notice that there are just a few lines of code in the example – that is the core of magic of BSPump!
The example first creates an application object and stores it in a local variable:

app = bspump.BSPumpApplication()

Since we need to obtain, process and send data, we also need to create the so-called pipeline that itself holds ordered objects of processors that get, modify and pass data to one another in a specified order. You can see the code of the sample pipeline in the top part of the example. The pipeline is represented by the class named SamplePipeline and the core of its function dwells in the self.build method.
Let us take a closer look:

self.Sink = bspump.file.FileCSVSink(app, self, config={'path': 'out.csv'})
self.build(
    bspump.file.FileCSVSource(app, self, config={
        'path': 'sample.csv',
        'delimiter': ';'
    }).on(bspump.trigger.RunOnceTrigger(app)),
    bspump.common.PPrintProcessor(app, self),
    self.Sink
)

The first processor that we are going to pass to the pipeline is FileCSVSource that is able to read a CSV file or files as Python dictionaries. In the example there is a specified file named sample.csv, however we can also specify a storage directory with more files. The simplest way how to do it is to modify the value of the path attribute from sample.csv to, let us say, ./data/*. This little change indicates that we are going to read all files from the data directory in our repository. Now you may ask: If there are more instances of the program running, will they all read all files present in the directory? The answer may surprise you, because the implicit behavior of the BSPump makes sure that every file is processed only once regardless of how many parallel instances are actually running. Each file is locked when one of the instances is reading it and when the file is processed, the appropriate state is indicated by the “-processed” suffix that is added to the filename, so no other instance will try to read it again. The BSPump will simply take care of it. The only thing we need to do is to remove the RunOnceTrigger that calls the pipeline exactly once, and use, for example, the OpportunisticTrigger, which keeps restarting the pipeline until there are no files available.

The sample pipeline then contains a processor that does nothing but prints the output data to the screen, and a CSV sink that saves the data to a specified output CSV file. We can of course use our own processors and sinks, for example the pre-prepared ElasticSearch sink:

bspump.elasticsearch.ElasticSearchSink(app, self, es_connection, config={
    "index_prefix": "YOUR_INDEX_PREFIX",
    "doctype": "YOUR_DOCTYPE"
})

The ElasicSearch sink receives a connection (as a parameter) that communicates between our program and the ElasticSearch that runs within a specified endpoint (URL). If you are more interested in this feature and its configuration, please see the ElasticSearch example file called bspump-es.py in the BSPump repository. If we combine both examples together, we nearly obtain our desired solution.

Build your own processor

Now we can use our own processor instead of the previously mentioned PPrintProcessor.
Its code is quite simple:

class PrintProcessor(Processor):
    def process(self, context, event):
        print(event)
        return event

You can see that what it really does is that it only prints the input data that are represented by the event attribute. Creating your own processor is straightforward. In our example, the event attribute contains one line of the processed CSV file (whose name can be obtained through the context variable). This line’s data are implicitly represented by an ordered Python dictionary. We can now create our own class and include it in our sample pipeline. And that’s it. There is nothing more you have to do to create a functioning program that asynchronously reads CSV files, performs your specified transformations and delivers the data to a single database or other type of storage. You can of course move each class to a separate file, create Python modules, comment methods and make the code clean and crystal clear to other developers in your company.

Visualization of processed sample data loaded from CSV files with BSPump.

Image: Visualization of processed sample data loaded from CSV files with BSPump.

Conclusion

I hope this simple example illustrates how easy and simple it is to create seemingly complex application with BSPump, pipelines and asynchronous approach. If you have any ideas on how we can improve the concept or extend capabilities of the BSPump or the ASAB server the BSPump is based upon, please let us know or contribute to the code at GitHub: https://github.com/LibertyAces/BitSwanPump

Continue to next article

About the Author

Premysl Cerny

Software Developer at TeskaLabs




You Might Be Interested in Reading These Articles

SeaCat tutorial - Chapter 1: Hello World (iOS)

This is the first practical tutorial in our tutorial series to demonstrate the strength and capabilities of SeaCat secure access solution. Our goal is to develop several sample applications and uncover the best practices you might be interested in.

Continue reading ...

tech tutorial ios osx

Published on August 18, 2014

The Golden Age of Black Hats

I experienced a precious moment, discovering the cause which contributed to today's dire state of mobile application security. App developers think that if their apps do not deal with money, they should not have to care about app security. Is it true?

Continue reading ...

security development

Published on February 24, 2015

Software architect's point of view: Why use SeaCat

I've recently received an interesting question from one software architect: Why should he consider embedding SeaCat in his intended mobile application? This turned into a detailed discussion and I realised that not every benefit of SeaCat technology is apparent at first glance. Let me discuss the most common challenges of a software developer in the area of secure mobile communication and the way SeaCat helps to resolve them. The initial impulse for building SeaCat was actually out of frustration of repeating development challenges linked with implementation of secure mobile application communication. So let's talk about the most common challenges and how SeaCat address them.

Continue reading ...

tech development

Published on April 16, 2014