# -*- coding: utf-8 -*-
# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
# OpenLP - Open Source Lyrics Projection #
# --------------------------------------------------------------------------- #
# Copyright (c) 2008-2017 OpenLP Developers #
# --------------------------------------------------------------------------- #
# This program is free software; you can redistribute it and/or modify it #
# under the terms of the GNU General Public License as published by the Free #
# Software Foundation; version 2 of the License. #
# #
# This program is distributed in the hope that it will be useful, but WITHOUT #
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
# more details. #
# #
# You should have received a copy of the GNU General Public License along #
# with this program; if not, write to the Free Software Foundation, Inc., 59 #
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
The :mod:`easyworship` module provides the functionality for importing EasyWorship song databases into OpenLP.
import os
import struct
import re
import zlib
import logging
import sqlite3
from openlp.core.lib import translate
from openlp.plugins.songs.lib import VerseType
from openlp.plugins.songs.lib import retrieve_windows_encoding, strip_rtf
from .songimport import SongImport
# regex: at least two newlines, can have spaces between them
SLIDE_BREAK_REGEX = re.compile(r'\n *?\n[\n ]*')
NUMBER_REGEX = re.compile(r'[0-9]+')
NOTE_REGEX = re.compile(r'\(.*?\)')
log = logging.getLogger(__name__)
[docs]class FieldDescEntry:
def __init__(self, name, field_type, size):
self.name = name
self.field_type = field_type
self.size = size
[docs]class FieldType(object):
An enumeration class for different field types that can be expected in an EasyWorship song file.
String = 1
Int16 = 3
Int32 = 4
Logical = 9
Memo = 0x0c
Blob = 0x0d
Timestamp = 0x15
[docs]class EasyWorshipSongImport(SongImport):
The :class:`EasyWorshipSongImport` class provides OpenLP with the
ability to import EasyWorship song files.
def __init__(self, manager, **kwargs):
super(EasyWorshipSongImport, self).__init__(manager, **kwargs)
self.entry_error_log = ''
[docs] def do_import(self):
Determines the type of file to import and calls the appropiate method
if self.import_source.lower().endswith('ews'):
elif self.import_source.endswith('DB'):
[docs] def import_ews(self):
Import the songs from service file
The full spec of the ews files can be found here:
or here: http://wiki.openlp.org/Development:EasyWorship_EWS_Format
# Open ews file if it exists
if not os.path.isfile(self.import_source):
log.debug('Given ews file does not exists.')
# Make sure there is room for at least a header and one entry
if os.path.getsize(self.import_source) < 892:
log.debug('Given ews file is to small to contain valid data.')
# Take a stab at how text is encoded
self.encoding = 'cp1252'
self.encoding = retrieve_windows_encoding(self.encoding)
if not self.encoding:
log.debug('No encoding set.')
self.ews_file = open(self.import_source, 'rb')
# EWS header, version '1.6'/' 3'/' 5':
# Offset Field Data type Length Details
# --------------------------------------------------------------------------------------------------
# 0 Filetype string 38 Specifies the file type and version.
# "EasyWorship Schedule File Version 1.6" or
# "EasyWorship Schedule File Version 3" or
# "EasyWorship Schedule File Version 5"
# 40/48/56 Entry count int32le 4 Number of items in the schedule
# 44/52/60 Entry length int16le 2 Length of schedule entries: 0x0718 = 1816
# Get file version
type, = struct.unpack('<38s', self.ews_file.read(38))
version = type.decode()[-3:]
# Set fileposition based on filetype/version
file_pos = 0
if version == ' 5':
file_pos = 56
elif version == ' 3':
file_pos = 48
elif version == '1.6':
file_pos = 40
log.debug('Given ews file is of unknown version.')
entry_count = self.ews_get_i32(file_pos)
entry_length = self.ews_get_i16(file_pos + 4)
file_pos += 6
# Loop over songs
for i in range(entry_count):
# Load EWS entry metadata:
# Offset Field Data type Length Details
# ------------------------------------------------------------------------------------------------
# 0 Title cstring 50
# 307 Author cstring 50
# 358 Copyright cstring 100
# 459 Administrator cstring 50
# 800 Content pointer int32le 4 Position of the content for this entry.
# 820 Content type int32le 4 0x01 = Song, 0x02 = Scripture, 0x03 = Presentation,
# 0x04 = Video, 0x05 = Live video, 0x07 = Image,
# 0x08 = Audio, 0x09 = Web
# 1410 Song number cstring 10
self.title = self.ews_get_string(file_pos + 0, 50)
authors = self.ews_get_string(file_pos + 307, 50)
copyright = self.ews_get_string(file_pos + 358, 100)
admin = self.ews_get_string(file_pos + 459, 50)
cont_ptr = self.ews_get_i32(file_pos + 800)
cont_type = self.ews_get_i32(file_pos + 820)
self.ccli_number = self.ews_get_string(file_pos + 1410, 10)
# Only handle content type 1 (songs)
if cont_type != 1:
file_pos += entry_length
# Load song content
# Offset Field Data type Length Details
# ------------------------------------------------------------------------------------------------
# 0 Length int32le 4 Length (L) of content, including the compressed content
# and the following fields (14 bytes total).
# 4 Content string L-14 Content compressed with deflate.
# Checksum int32be 4 Alder-32 checksum.
# (unknown) 4 0x51 0x4b 0x03 0x04
# Content length int32le 4 Length of content after decompression
content_length = self.ews_get_i32(cont_ptr)
deflated_content = self.ews_get_bytes(cont_ptr + 4, content_length - 10)
deflated_length = self.ews_get_i32(cont_ptr + 4 + content_length - 6)
inflated_content = zlib.decompress(deflated_content, 15, deflated_length)
if copyright:
self.copyright = copyright
if admin:
if copyright:
self.copyright += ', '
self.copyright += translate('SongsPlugin.EasyWorshipSongImport',
'Administered by {admin}').format(admin=admin)
# Set the SongImport object members.
self.set_song_import_object(authors, inflated_content)
if self.stop_import_flag:
if self.entry_error_log:
'"{title}" could not be imported. {entry}').format(title=self.title,
self.entry_error_log = ''
elif not self.finish():
# Set file_pos for next entry
file_pos += entry_length
[docs] def import_db(self):
Import the songs from the database
# Open the DB and MB files if they exist
import_source_mb = self.import_source.replace('.DB', '.MB')
if not os.path.isfile(self.import_source):
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This file does not exist.'))
if not os.path.isfile(import_source_mb):
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'Could not find the "Songs.MB" file. It must be in the same '
'folder as the "Songs.DB" file.'))
db_size = os.path.getsize(self.import_source)
if db_size < 0x800:
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This file is not a valid EasyWorship database.'))
db_file = open(self.import_source, 'rb')
self.memo_file = open(import_source_mb, 'rb')
# Don't accept files that are clearly not paradox files
record_size, header_size, block_size, first_block, num_fields = struct.unpack('<hhxb8xh17xh', db_file.read(35))
if header_size != 0x800 or block_size < 1 or block_size > 4:
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This file is not a valid EasyWorship database.'))
# Take a stab at how text is encoded
self.encoding = 'cp1252'
code_page, = struct.unpack('<h', db_file.read(2))
if code_page == 852:
self.encoding = 'cp1250'
# The following codepage to actual encoding mappings have not been
# observed, but merely guessed. Actual example files are needed.
elif code_page == 737:
self.encoding = 'cp1253'
elif code_page == 775:
self.encoding = 'cp1257'
elif code_page == 855:
self.encoding = 'cp1251'
elif code_page == 857:
self.encoding = 'cp1254'
elif code_page == 866:
self.encoding = 'cp1251'
elif code_page == 869:
self.encoding = 'cp1253'
elif code_page == 862:
self.encoding = 'cp1255'
elif code_page == 874:
self.encoding = 'cp874'
self.encoding = retrieve_windows_encoding(self.encoding)
if not self.encoding:
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'Could not retrieve encoding.'))
# Read the field description information
field_info = db_file.read(num_fields * 2)
db_file.seek(4 + (num_fields * 4) + 261, os.SEEK_CUR)
field_names = db_file.read(header_size - db_file.tell()).split(b'\0', num_fields)
field_descriptions = []
for i, field_name in enumerate(field_names):
field_type, field_size = struct.unpack_from('BB', field_info, i * 2)
field_descriptions.append(FieldDescEntry(field_name, field_type, field_size))
# Pick out the field description indexes we will need
success = True
fi_title = self.db_find_field(b'Title')
fi_author = self.db_find_field(b'Author')
fi_copy = self.db_find_field(b'Copyright')
fi_admin = self.db_find_field(b'Administrator')
fi_words = self.db_find_field(b'Words')
fi_ccli = self.db_find_field(b'Song Number')
except IndexError:
# This is the wrong table
success = False
# There does not appear to be a _reliable_ way of getting the number of songs/records, so loop through the file
# blocks and total the number of records. Store the information in a list so we dont have to do all this again.
cur_block = first_block
total_count = 0
block_list = []
while cur_block != 0 and success:
cur_block_pos = header_size + ((cur_block - 1) * 1024 * block_size)
cur_block, rec_count = struct.unpack('<h2xh', db_file.read(6))
rec_count = (rec_count + record_size) // record_size
block_list.append((cur_block_pos, rec_count))
total_count += rec_count
for block in block_list:
cur_block_pos, rec_count = block
db_file.seek(cur_block_pos + 6)
# Loop through each record within the current block
for i in range(rec_count):
if self.stop_import_flag:
raw_record = db_file.read(record_size)
self.fields = self.record_structure.unpack(raw_record)
self.title = self.db_get_field(fi_title).decode(self.encoding)
# Get remaining fields.
copy = self.db_get_field(fi_copy)
admin = self.db_get_field(fi_admin)
ccli = self.db_get_field(fi_ccli)
authors = self.db_get_field(fi_author)
words = self.db_get_field(fi_words)
if copy:
self.copyright = copy.decode(self.encoding)
if admin:
if copy:
self.copyright += ', '
self.copyright += translate('SongsPlugin.EasyWorshipSongImport',
'Administered by {admin}').format(admin=admin.decode(self.encoding))
if ccli:
self.ccli_number = ccli.decode(self.encoding)
if authors:
authors = authors.decode(self.encoding)
authors = ''
# Set the SongImport object members.
self.set_song_import_object(authors, words)
if self.stop_import_flag:
if self.entry_error_log:
'"{title}" could not be imported. '
'{entry}').format(title=self.title, entry=self.entry_error_log))
self.entry_error_log = ''
elif not self.finish():
except Exception as e:
'"{title}" could not be imported. {error}').format(title=self.title,
def _find_file(self, base_path, path_list):
Find the specified file, with the option of the file being at any level in the specified directory structure.
:param base_path: the location search in
:param path_list: the targeted file, preceded by directories that may be their parents relative to the base_path
:return: path for targeted file
target_file = ''
while len(path_list) > 0:
target_file = os.path.join(path_list[-1], target_file)
path_list = path_list[:len(path_list) - 1]
full_path = os.path.join(base_path, target_file)
full_path = full_path[:len(full_path) - 1]
if os.path.isfile(full_path):
return full_path
return ''
[docs] def import_sqlite_db(self):
Import the songs from an EasyWorship 6 SQLite database
songs_db_path = self._find_file(self.import_source, ["Databases", "Data", "Songs.db"])
song_words_db_path = self._find_file(self.import_source, ["Databases", "Data", "SongWords.db"])
invalid_dir_msg = 'This does not appear to be a valid Easy Worship 6 database directory.'
# check to see if needed files are there
if not os.path.isfile(songs_db_path):
self.log_error(songs_db_path, translate('SongsPlugin.EasyWorshipSongImport', invalid_dir_msg))
if not os.path.isfile(song_words_db_path):
self.log_error(song_words_db_path, translate('SongsPlugin.EasyWorshipSongImport', invalid_dir_msg))
# get database handles
songs_conn = sqlite3.connect(songs_db_path)
words_conn = sqlite3.connect(song_words_db_path)
if songs_conn is None or words_conn is None:
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This is not a valid Easy Worship 6 database.'))
songs_db = songs_conn.cursor()
words_db = words_conn.cursor()
if songs_conn is None or words_conn is None:
self.log_error(self.import_source, translate('SongsPlugin.EasyWorshipSongImport',
'This is not a valid Easy Worship 6 database.'))
# Take a stab at how text is encoded
self.encoding = 'cp1252'
self.encoding = retrieve_windows_encoding(self.encoding)
if not self.encoding:
log.debug('No encoding set.')
# import songs
songs = songs_db.execute('SELECT rowid,title,author,copyright,vendor_id FROM song;')
for song in songs:
song_id = song[0]
# keep extra copy of title for error message because error check clears it
self.title = title = song[1]
self.author = song[2]
self.copyright = song[3]
self.ccli_number = song[4]
words = words_db.execute('SELECT words FROM word WHERE song_id = ?;', (song_id,))
self.set_song_import_object(self.author, words.fetchone()[0].encode())
if not self.finish():
'"{title}" could not be imported. {entry}').
format(title=title, entry=self.entry_error_log))
# close database handles
[docs] def set_song_import_object(self, authors, words):
Set the SongImport object members.
:param authors: String with authons
:param words: Bytes with rtf-encoding
if authors:
# Split up the authors
author_list = authors.split('/')
if len(author_list) < 2:
author_list = authors.split(';')
if len(author_list) < 2:
author_list = authors.split(',')
for author_name in author_list:
if words:
# Format the lyrics
result = None
decoded_words = None
decoded_words = words.decode()
except UnicodeDecodeError:
# The unicode chars in the rtf was not escaped in the expected manner
self.entry_error_log = translate('SongsPlugin.EasyWorshipSongImport',
'Unexpected data formatting.')
result = strip_rtf(decoded_words, self.encoding)
if result is None:
self.entry_error_log = translate('SongsPlugin.EasyWorshipSongImport',
'No song text found.')
words, self.encoding = result
verse_type = VerseType.tags[VerseType.Verse]
for verse in SLIDE_BREAK_REGEX.split(words):
verse = verse.strip()
if not verse:
verse_split = verse.split('\n', 1)
first_line_is_tag = False
# EW tags: verse, chorus, pre-chorus, bridge, tag,
# intro, ending, slide
for tag in VerseType.names + ['tag', 'slide', 'end']:
tag = tag.lower()
ew_tag = verse_split[0].strip().lower()
if ew_tag.startswith(tag):
verse_type = tag[0]
if tag == 'tag' or tag == 'slide':
verse_type = VerseType.tags[VerseType.Other]
first_line_is_tag = True
number_found = False
# check if tag is followed by number and/or note
if len(ew_tag) > len(tag):
match = NUMBER_REGEX.search(ew_tag)
if match:
number = match.group()
verse_type += number
number_found = True
match = NOTE_REGEX.search(ew_tag)
if match:
self.comments += ew_tag + '\n'
if not number_found:
verse_type += '1'
# If the verse only consist of the tag-line, add an empty line to create an empty slide
if first_line_is_tag and len(verse_split) == 1:
self.add_verse(verse_split[-1].strip() if first_line_is_tag else verse, verse_type)
if len(self.comments) > 5:
self.comments += str(translate('SongsPlugin.EasyWorshipSongImport',
'\n[above are Song Tags with notes imported from EasyWorship]'))
[docs] def db_find_field(self, field_name):
Find a field in the descriptions
:param field_name: field to find
return [i for i, x in enumerate(self.field_descriptions) if x.name == field_name][0]
[docs] def db_set_record_struct(self, field_descriptions):
Save the record structure
:param field_descriptions: An array of field descriptions
# Begin with empty field struct list
fsl = ['>']
for field_desc in field_descriptions:
if field_desc.field_type == FieldType.String:
elif field_desc.field_type == FieldType.Int16:
elif field_desc.field_type == FieldType.Int32:
elif field_desc.field_type == FieldType.Logical:
elif field_desc.field_type == FieldType.Memo:
elif field_desc.field_type == FieldType.Blob:
elif field_desc.field_type == FieldType.Timestamp:
self.record_structure = struct.Struct(''.join(fsl))
self.field_descriptions = field_descriptions
[docs] def db_get_field(self, field_desc_index):
Extract the field
:param field_desc_index: Field index value
:return: The field value
field = self.fields[field_desc_index]
field_desc = self.field_descriptions[field_desc_index]
# Return None in case of 'blank' entries
if isinstance(field, bytes):
if not field.rstrip(b'\0'):
return None
elif field == 0:
return None
# Format the field depending on the field type
if field_desc.field_type == FieldType.String:
return field.rstrip(b'\0')
elif field_desc.field_type == FieldType.Int16:
return field ^ 0x8000
elif field_desc.field_type == FieldType.Int32:
return field ^ 0x80000000
elif field_desc.field_type == FieldType.Logical:
return field ^ 0x80 == 1
elif field_desc.field_type == FieldType.Memo or field_desc.field_type == FieldType.Blob:
block_start, blob_size = struct.unpack_from('<II', field, len(field) - 10)
sub_block = block_start & 0xff
block_start &= ~0xff
memo_block_type, = struct.unpack('b', self.memo_file.read(1))
if memo_block_type == 2:
self.memo_file.seek(8, os.SEEK_CUR)
elif memo_block_type == 3:
if sub_block > 63:
return b''
self.memo_file.seek(11 + (5 * sub_block), os.SEEK_CUR)
sub_block_start, = struct.unpack('B', self.memo_file.read(1))
self.memo_file.seek(block_start + (sub_block_start * 16))
return b''
return self.memo_file.read(blob_size)
return 0
[docs] def ews_get_bytes(self, pos, length):
Get bytes from ews_file
:param pos: Position to read from
:param length: Bytes to read
:return: Bytes read
return self.ews_file.read(length)
[docs] def ews_get_string(self, pos, length):
Get string from ews_file
:param pos: Position to read from
:param length: Characters to read
:return: String read
bytes = self.ews_get_bytes(pos, length)
mask = '<' + str(length) + 's'
byte_str, = struct.unpack(mask, bytes)
return byte_str.decode(self.encoding).replace('\0', '').strip()
[docs] def ews_get_i16(self, pos):
Get short int from ews_file
:param pos: Position to read from
:return: Short integer read
bytes = self.ews_get_bytes(pos, 2)
mask = '<h'
number, = struct.unpack(mask, bytes)
return number
[docs] def ews_get_i32(self, pos):
Get long int from ews_file
:param pos: Position to read from
:return: Long integer read
bytes = self.ews_get_bytes(pos, 4)
mask = '<i'
number, = struct.unpack(mask, bytes)
return number