-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcsvgrep.py
More file actions
executable file
·136 lines (118 loc) · 3.81 KB
/
csvgrep.py
File metadata and controls
executable file
·136 lines (118 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
"""
Read in a CSV, dump any lines that match (or don't match) the provided regex on the line (or on the specified column).
Allows grepping a CSV that has multiline fields, which is not supported by raw grep.
"""
from defusedcsv import csv
import argparse
import re
import logging
LOG = logging.getLogger(__name__)
ANY_COLUMN = -1
def add_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"match",
type=str,
help="The regex to match.",
)
parser.add_argument(
"-i",
"--input",
type=argparse.FileType("r"),
default="-",
help="Input CSV file. Default is stdin.",
)
parser.add_argument(
"-o",
"--output",
type=argparse.FileType("w"),
default="-",
help="Output CSV file. Default is stdout.",
)
parser.add_argument(
"-c",
"--column",
type=int,
default=ANY_COLUMN,
help="The column to grep on, 0-indexed. Default is -1, which greps all columns.",
)
parser.add_argument(
"-n",
"--no-header",
action="store_true",
help="If set, the first line of the input CSV will be treated as data, not a header.",
)
parser.add_argument(
"-V",
"--invert-match",
action="store_true",
help="Invert the match. If set, lines that do not match will be dumped.",
)
parser.add_argument(
"-F",
"--fixed-strings",
action="store_true",
help="Interpret the match as a fixed string, not a regex.",
)
parser.add_argument(
"--field-size-limit",
type=int,
default=csv.field_size_limit(),
help="The maximum size of a single CSV field.",
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="Enable debug logging.",
)
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
add_args(parser)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
if csv.field_size_limit() < args.field_size_limit:
csv.field_size_limit(args.field_size_limit)
reader = csv.reader(args.input)
writer = csv.writer(args.output)
if args.fixed_strings:
regex = re.compile(re.escape(args.match))
else:
regex = re.compile(args.match)
LOG.debug("Using regex: %s", regex.pattern)
first_row = True
lines = 0
for row in reader:
if first_row:
first_row = False
if args.column != ANY_COLUMN and args.column >= len(row):
LOG.error("First row %s has fewer columns than %s, cannot match on that column", row, args.column)
return
if not args.no_header:
writer.writerow(row)
continue
if args.column == ANY_COLUMN:
# match the regex on any column
if args.invert_match:
if not any(regex.search(cell) for cell in row):
writer.writerow(row)
lines += 1
elif any(regex.search(cell) for cell in row):
writer.writerow(row)
lines += 1
else:
# match the regex on the specified column
if args.invert_match:
try:
if not regex.search(row[args.column]):
writer.writerow(row)
lines += 1
except IndexError:
LOG.error("Row %s has fewer columns than %s, cannot match on that column", row, args.column)
continue
elif regex.search(row[args.column]):
writer.writerow(row)
lines += 1
LOG.info("Matched %s rows", lines)
if __name__ == "__main__":
main()